commit
922134aabb
|
@ -225,17 +225,17 @@ static int32_t dnodeOpenVnode(int32_t vnode, char *rootDir) {
|
|||
vnodeObj.status = TSDB_VN_STATUS_NOT_READY;
|
||||
vnodeObj.refCount = 1;
|
||||
vnodeObj.version = 0;
|
||||
vnodeObj.wworker = dnodeAllocateWriteWorker();
|
||||
vnodeObj.rworker = dnodeAllocateReadWorker();
|
||||
vnodeObj.wal = NULL;
|
||||
vnodeObj.tsdb = pTsdb;
|
||||
vnodeObj.replica = NULL;
|
||||
vnodeObj.events = NULL;
|
||||
vnodeObj.cq = NULL;
|
||||
|
||||
taosAddIntHash(tsDnodeVnodesHash, vnodeObj.vgId, (char *) (&vnodeObj));
|
||||
SVnodeObj *pVnode = (SVnodeObj *)taosAddIntHash(tsDnodeVnodesHash, vnodeObj.vgId, (char *)(&vnodeObj));
|
||||
pVnode->wworker = dnodeAllocateWriteWorker(pVnode);
|
||||
pVnode->rworker = dnodeAllocateReadWorker(pVnode);
|
||||
|
||||
dTrace("open vnode:%d in %s", vnodeObj.vgId, rootDir);
|
||||
dTrace("open vnode:%d in %s", pVnode->vgId, rootDir);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -314,17 +314,17 @@ static int32_t dnodeCreateVnode(SMDCreateVnodeMsg *pVnodeCfg) {
|
|||
vnodeObj.status = TSDB_VN_STATUS_NOT_READY;
|
||||
vnodeObj.refCount = 1;
|
||||
vnodeObj.version = 0;
|
||||
vnodeObj.wworker = dnodeAllocateWriteWorker();
|
||||
vnodeObj.rworker = dnodeAllocateReadWorker();
|
||||
vnodeObj.wal = NULL;
|
||||
vnodeObj.tsdb = pTsdb;
|
||||
vnodeObj.replica = NULL;
|
||||
vnodeObj.events = NULL;
|
||||
vnodeObj.cq = NULL;
|
||||
|
||||
taosAddIntHash(tsDnodeVnodesHash, vnodeObj.vgId, (char *) (&vnodeObj));
|
||||
SVnodeObj *pVnode = (SVnodeObj *)taosAddIntHash(tsDnodeVnodesHash, vnodeObj.vgId, (char *)(&vnodeObj));
|
||||
pVnode->wworker = dnodeAllocateWriteWorker(pVnode);
|
||||
pVnode->rworker = dnodeAllocateReadWorker(pVnode);
|
||||
|
||||
dPrint("vgroup:%d, vnode:%d is created", vnodeObj.vgId, vnodeObj.vgId);
|
||||
dPrint("vgroup:%d, vnode:%d is created", pVnode->vgId, pVnode->vgId);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
|
|
@ -36,16 +36,15 @@ typedef struct {
|
|||
void *pCont;
|
||||
int32_t contLen;
|
||||
SRpcMsg rpcMsg;
|
||||
void *pVnode;
|
||||
SRpcContext *pRpcContext; // RPC message context
|
||||
} SReadMsg;
|
||||
|
||||
static void *dnodeProcessReadQueue(void *param);
|
||||
static void dnodeProcessReadResult(SReadMsg *pRead);
|
||||
static void dnodeProcessReadResult(void *pVnode, SReadMsg *pRead);
|
||||
static void dnodeHandleIdleReadWorker();
|
||||
static void dnodeProcessQueryMsg(SReadMsg *pMsg);
|
||||
static void dnodeProcessRetrieveMsg(SReadMsg *pMsg);
|
||||
static void(*dnodeProcessReadMsgFp[TSDB_MSG_TYPE_MAX])(SReadMsg *pNode);
|
||||
static void dnodeProcessQueryMsg(void *pVnode, SReadMsg *pMsg);
|
||||
static void dnodeProcessRetrieveMsg(void *pVnode, SReadMsg *pMsg);
|
||||
static void(*dnodeProcessReadMsgFp[TSDB_MSG_TYPE_MAX])(void *pVnode, SReadMsg *pNode);
|
||||
|
||||
// module global variable
|
||||
static taos_qset readQset;
|
||||
|
@ -104,23 +103,19 @@ void dnodeRead(SRpcMsg *pMsg) {
|
|||
}
|
||||
|
||||
// put message into queue
|
||||
SReadMsg readMsg = {
|
||||
.rpcMsg = *pMsg,
|
||||
.pCont = pCont,
|
||||
.contLen = pHead->contLen,
|
||||
.pRpcContext = pRpcContext,
|
||||
.pVnode = pVnode,
|
||||
};
|
||||
SReadMsg *pRead = (SReadMsg *)taosAllocateQitem(sizeof(SReadMsg));
|
||||
pRead->rpcMsg = *pMsg;
|
||||
pRead->pCont = pCont;
|
||||
pRead->contLen = pHead->contLen;
|
||||
pRead->pRpcContext = pRpcContext;
|
||||
|
||||
taos_queue queue = dnodeGetVnodeRworker(pVnode);
|
||||
taosWriteQitem(queue, &readMsg);
|
||||
taosWriteQitem(queue, TAOS_QTYPE_RPC, pRead);
|
||||
|
||||
// next vnode
|
||||
leftLen -= pHead->contLen;
|
||||
pCont -= pHead->contLen;
|
||||
queuedMsgNum++;
|
||||
|
||||
dnodeReleaseVnode(pVnode);
|
||||
}
|
||||
|
||||
if (queuedMsgNum == 0) {
|
||||
|
@ -135,11 +130,11 @@ void dnodeRead(SRpcMsg *pMsg) {
|
|||
}
|
||||
}
|
||||
|
||||
void *dnodeAllocateReadWorker() {
|
||||
void *dnodeAllocateReadWorker(void *pVnode) {
|
||||
taos_queue *queue = taosOpenQueue(sizeof(SReadMsg));
|
||||
if (queue == NULL) return NULL;
|
||||
|
||||
taosAddIntoQset(readQset, queue);
|
||||
taosAddIntoQset(readQset, queue, pVnode);
|
||||
|
||||
// spawn a thread to process queue
|
||||
if (threads < maxThreads) {
|
||||
|
@ -164,20 +159,27 @@ void dnodeFreeReadWorker(void *rqueue) {
|
|||
|
||||
static void *dnodeProcessReadQueue(void *param) {
|
||||
taos_qset qset = (taos_qset)param;
|
||||
SReadMsg readMsg;
|
||||
SReadMsg *pReadMsg;
|
||||
int type;
|
||||
void *pVnode;
|
||||
|
||||
while (1) {
|
||||
if (taosReadQitemFromQset(qset, &readMsg) <= 0) {
|
||||
if (taosReadQitemFromQset(qset, &type, (void **)&pReadMsg, (void **)&pVnode) == 0) {
|
||||
dnodeHandleIdleReadWorker();
|
||||
continue;
|
||||
}
|
||||
|
||||
terrno = 0;
|
||||
if (dnodeProcessReadMsgFp[readMsg.rpcMsg.msgType]) {
|
||||
(*dnodeProcessReadMsgFp[readMsg.rpcMsg.msgType]) (&readMsg);
|
||||
if (dnodeProcessReadMsgFp[pReadMsg->rpcMsg.msgType]) {
|
||||
(*dnodeProcessReadMsgFp[pReadMsg->rpcMsg.msgType]) (pVnode, pReadMsg);
|
||||
} else {
|
||||
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
|
||||
}
|
||||
|
||||
dnodeProcessReadResult(pVnode, pReadMsg);
|
||||
taosFreeQitem(pReadMsg);
|
||||
|
||||
dnodeReleaseVnode(pVnode);
|
||||
}
|
||||
|
||||
return NULL;
|
||||
|
@ -195,11 +197,11 @@ static void dnodeHandleIdleReadWorker() {
|
|||
}
|
||||
}
|
||||
|
||||
static void dnodeProcessReadResult(SReadMsg *pRead) {
|
||||
static void dnodeProcessReadResult(void *pVnode, SReadMsg *pRead) {
|
||||
SRpcContext *pRpcContext = pRead->pRpcContext;
|
||||
int32_t code = 0;
|
||||
|
||||
dnodeReleaseVnode(pRead->pVnode);
|
||||
dnodeReleaseVnode(pVnode);
|
||||
|
||||
if (pRpcContext) {
|
||||
if (terrno) {
|
||||
|
@ -218,19 +220,23 @@ static void dnodeProcessReadResult(SReadMsg *pRead) {
|
|||
code = terrno;
|
||||
}
|
||||
|
||||
SRpcMsg rsp;
|
||||
rsp.handle = pRead->rpcMsg.handle;
|
||||
rsp.code = code;
|
||||
rsp.pCont = NULL;
|
||||
rpcSendResponse(&rsp);
|
||||
//TODO: query handle is returned by dnodeProcessQueryMsg
|
||||
if (0) {
|
||||
SRpcMsg rsp;
|
||||
rsp.handle = pRead->rpcMsg.handle;
|
||||
rsp.code = code;
|
||||
rsp.pCont = NULL;
|
||||
rpcSendResponse(&rsp);
|
||||
}
|
||||
|
||||
rpcFreeCont(pRead->rpcMsg.pCont); // free the received message
|
||||
}
|
||||
|
||||
static void dnodeProcessQueryMsg(SReadMsg *pMsg) {
|
||||
static void dnodeProcessQueryMsg(void *pVnode, SReadMsg *pMsg) {
|
||||
SQueryTableMsg* pQueryTableMsg = (SQueryTableMsg*) pMsg->pCont;
|
||||
|
||||
SQInfo* pQInfo = NULL;
|
||||
void* tsdb = dnodeGetVnodeTsdb(pMsg->pVnode);
|
||||
void* tsdb = dnodeGetVnodeTsdb(pVnode);
|
||||
int32_t code = qCreateQueryInfo(tsdb, pQueryTableMsg, &pQInfo);
|
||||
|
||||
SQueryTableRsp *pRsp = (SQueryTableRsp *) rpcMallocCont(sizeof(SQueryTableRsp));
|
||||
|
@ -252,7 +258,7 @@ static void dnodeProcessQueryMsg(SReadMsg *pMsg) {
|
|||
}
|
||||
|
||||
static int32_t c = 0;
|
||||
static void dnodeProcessRetrieveMsg(SReadMsg *pMsg) {
|
||||
static void dnodeProcessRetrieveMsg(void *pVnode, SReadMsg *pMsg) {
|
||||
SRetrieveTableMsg *pRetrieve = pMsg->pCont;
|
||||
void *pQInfo = (void*) htobe64(pRetrieve->qhandle);
|
||||
|
||||
|
|
|
@ -35,7 +35,6 @@ typedef struct _write {
|
|||
void *pCont;
|
||||
int32_t contLen;
|
||||
SRpcMsg rpcMsg;
|
||||
void *pVnode; // pointer to vnode
|
||||
SRpcContext *pRpcContext; // RPC message context
|
||||
} SWriteMsg;
|
||||
|
||||
|
@ -51,15 +50,15 @@ typedef struct _thread_obj {
|
|||
SWriteWorker *writeWorker;
|
||||
} SWriteWorkerPool;
|
||||
|
||||
static void (*dnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MAX])(SWriteMsg *);
|
||||
static void (*dnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MAX])(void *, SWriteMsg *);
|
||||
static void *dnodeProcessWriteQueue(void *param);
|
||||
static void dnodeHandleIdleWorker(SWriteWorker *pWorker);
|
||||
static void dnodeProcessWriteResult(SWriteMsg *pWrite);
|
||||
static void dnodeProcessSubmitMsg(SWriteMsg *pMsg);
|
||||
static void dnodeProcessCreateTableMsg(SWriteMsg *pMsg);
|
||||
static void dnodeProcessDropTableMsg(SWriteMsg *pMsg);
|
||||
static void dnodeProcessAlterTableMsg(SWriteMsg *pMsg);
|
||||
static void dnodeProcessDropStableMsg(SWriteMsg *pMsg);
|
||||
static void dnodeProcessWriteResult(void *pVnode, SWriteMsg *pWrite);
|
||||
static void dnodeProcessSubmitMsg(void *pVnode, SWriteMsg *pMsg);
|
||||
static void dnodeProcessCreateTableMsg(void *pVnode, SWriteMsg *pMsg);
|
||||
static void dnodeProcessDropTableMsg(void *pVnode, SWriteMsg *pMsg);
|
||||
static void dnodeProcessAlterTableMsg(void *pVnode, SWriteMsg *pMsg);
|
||||
static void dnodeProcessDropStableMsg(void *pVnode, SWriteMsg *pMsg);
|
||||
|
||||
SWriteWorkerPool wWorkerPool;
|
||||
|
||||
|
@ -116,15 +115,14 @@ void dnodeWrite(SRpcMsg *pMsg) {
|
|||
}
|
||||
|
||||
// put message into queue
|
||||
SWriteMsg writeMsg;
|
||||
writeMsg.rpcMsg = *pMsg;
|
||||
writeMsg.pCont = pCont;
|
||||
writeMsg.contLen = pHead->contLen;
|
||||
writeMsg.pRpcContext = pRpcContext;
|
||||
writeMsg.pVnode = pVnode; // pVnode shall be saved for usage later
|
||||
SWriteMsg *pWrite = (SWriteMsg *)taosAllocateQitem(sizeof(SWriteMsg));
|
||||
pWrite->rpcMsg = *pMsg;
|
||||
pWrite->pCont = pCont;
|
||||
pWrite->contLen = pHead->contLen;
|
||||
pWrite->pRpcContext = pRpcContext;
|
||||
|
||||
taos_queue queue = dnodeGetVnodeWworker(pVnode);
|
||||
taosWriteQitem(queue, &writeMsg);
|
||||
taosWriteQitem(queue, TAOS_QTYPE_RPC, pWrite);
|
||||
|
||||
// next vnode
|
||||
leftLen -= pHead->contLen;
|
||||
|
@ -144,16 +142,16 @@ void dnodeWrite(SRpcMsg *pMsg) {
|
|||
}
|
||||
}
|
||||
|
||||
void *dnodeAllocateWriteWorker() {
|
||||
void *dnodeAllocateWriteWorker(void *pVnode) {
|
||||
SWriteWorker *pWorker = wWorkerPool.writeWorker + wWorkerPool.nextId;
|
||||
taos_queue *queue = taosOpenQueue(sizeof(SWriteMsg));
|
||||
taos_queue *queue = taosOpenQueue();
|
||||
if (queue == NULL) return NULL;
|
||||
|
||||
if (pWorker->qset == NULL) {
|
||||
pWorker->qset = taosOpenQset();
|
||||
if (pWorker->qset == NULL) return NULL;
|
||||
|
||||
taosAddIntoQset(pWorker->qset, queue);
|
||||
taosAddIntoQset(pWorker->qset, queue, pVnode);
|
||||
wWorkerPool.nextId = (wWorkerPool.nextId + 1) % wWorkerPool.max;
|
||||
|
||||
pthread_attr_t thAttr;
|
||||
|
@ -165,7 +163,7 @@ void *dnodeAllocateWriteWorker() {
|
|||
taosCloseQset(pWorker->qset);
|
||||
}
|
||||
} else {
|
||||
taosAddIntoQset(pWorker->qset, queue);
|
||||
taosAddIntoQset(pWorker->qset, queue, pVnode);
|
||||
wWorkerPool.nextId = (wWorkerPool.nextId + 1) % wWorkerPool.max;
|
||||
}
|
||||
|
||||
|
@ -181,53 +179,57 @@ void dnodeFreeWriteWorker(void *wqueue) {
|
|||
static void *dnodeProcessWriteQueue(void *param) {
|
||||
SWriteWorker *pWorker = (SWriteWorker *)param;
|
||||
taos_qall qall;
|
||||
SWriteMsg writeMsg;
|
||||
SWriteMsg *pWriteMsg;
|
||||
int32_t numOfMsgs;
|
||||
int type;
|
||||
void *pVnode;
|
||||
|
||||
qall = taosAllocateQall();
|
||||
|
||||
while (1) {
|
||||
numOfMsgs = taosReadAllQitemsFromQset(pWorker->qset, &qall);
|
||||
numOfMsgs = taosReadAllQitemsFromQset(pWorker->qset, qall, &pVnode);
|
||||
if (numOfMsgs <=0) {
|
||||
dnodeHandleIdleWorker(pWorker); // thread exit if no queues anymore
|
||||
continue;
|
||||
}
|
||||
|
||||
for (int32_t i=0; i<numOfMsgs; ++i) {
|
||||
for (int32_t i = 0; i < numOfMsgs; ++i) {
|
||||
// retrieve all items, and write them into WAL
|
||||
taosGetQitem(qall, &writeMsg);
|
||||
taosGetQitem(qall, &type, (void **)&pWriteMsg);
|
||||
|
||||
// walWrite(pVnode->whandle, writeMsg.rpcMsg.msgType, writeMsg.pCont, writeMsg.contLen);
|
||||
}
|
||||
|
||||
|
||||
// flush WAL file
|
||||
// walFsync(pVnode->whandle);
|
||||
|
||||
// browse all items, and process them one by one
|
||||
taosResetQitems(qall);
|
||||
for (int32_t i = 0; i < numOfMsgs; ++i) {
|
||||
taosGetQitem(qall, &writeMsg);
|
||||
taosGetQitem(qall, &type, (void **)&pWriteMsg);
|
||||
|
||||
terrno = 0;
|
||||
if (dnodeProcessWriteMsgFp[writeMsg.rpcMsg.msgType]) {
|
||||
(*dnodeProcessWriteMsgFp[writeMsg.rpcMsg.msgType]) (&writeMsg);
|
||||
if (dnodeProcessWriteMsgFp[pWriteMsg->rpcMsg.msgType]) {
|
||||
(*dnodeProcessWriteMsgFp[pWriteMsg->rpcMsg.msgType]) (pVnode, pWriteMsg);
|
||||
} else {
|
||||
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
|
||||
}
|
||||
|
||||
dnodeProcessWriteResult(&writeMsg);
|
||||
dnodeProcessWriteResult(pVnode, pWriteMsg);
|
||||
taosFreeQitem(pWriteMsg);
|
||||
}
|
||||
|
||||
// free the Qitems;
|
||||
taosFreeQitems(qall);
|
||||
}
|
||||
|
||||
taosFreeQall(qall);
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static void dnodeProcessWriteResult(SWriteMsg *pWrite) {
|
||||
static void dnodeProcessWriteResult(void *pVnode, SWriteMsg *pWrite) {
|
||||
SRpcContext *pRpcContext = pWrite->pRpcContext;
|
||||
int32_t code = 0;
|
||||
|
||||
dnodeReleaseVnode(pWrite->pVnode);
|
||||
dnodeReleaseVnode(pVnode);
|
||||
|
||||
if (pRpcContext) {
|
||||
if (terrno) {
|
||||
|
@ -267,7 +269,7 @@ static void dnodeHandleIdleWorker(SWriteWorker *pWorker) {
|
|||
}
|
||||
}
|
||||
|
||||
static void dnodeProcessSubmitMsg(SWriteMsg *pMsg) {
|
||||
static void dnodeProcessSubmitMsg(void *pVnode, SWriteMsg *pMsg) {
|
||||
dTrace("submit msg is disposed");
|
||||
|
||||
SShellSubmitRspMsg *pRsp = rpcMallocCont(sizeof(SShellSubmitRspMsg));
|
||||
|
@ -276,7 +278,7 @@ static void dnodeProcessSubmitMsg(SWriteMsg *pMsg) {
|
|||
pRsp->affectedRows = htonl(1);
|
||||
pRsp->numOfFailedBlocks = 0;
|
||||
|
||||
void* tsdb = dnodeGetVnodeTsdb(pMsg->pVnode);
|
||||
void* tsdb = dnodeGetVnodeTsdb(pVnode);
|
||||
assert(tsdb != NULL);
|
||||
|
||||
tsdbInsertData(tsdb, pMsg->pCont);
|
||||
|
@ -292,7 +294,7 @@ static void dnodeProcessSubmitMsg(SWriteMsg *pMsg) {
|
|||
rpcSendResponse(&rpcRsp);
|
||||
}
|
||||
|
||||
static void dnodeProcessCreateTableMsg(SWriteMsg *pMsg) {
|
||||
static void dnodeProcessCreateTableMsg(void *pVnode, SWriteMsg *pMsg) {
|
||||
SMDCreateTableMsg *pTable = pMsg->rpcMsg.pCont;
|
||||
SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
|
||||
|
||||
|
@ -341,16 +343,16 @@ static void dnodeProcessCreateTableMsg(SWriteMsg *pMsg) {
|
|||
tsdbTableSetTagValue(&tCfg, dataRow, false);
|
||||
}
|
||||
|
||||
void *pTsdb = dnodeGetVnodeTsdb(pMsg->pVnode);
|
||||
void *pTsdb = dnodeGetVnodeTsdb(pVnode);
|
||||
|
||||
rpcRsp.code = tsdbCreateTable(pTsdb, &tCfg);
|
||||
dnodeReleaseVnode(pMsg->pVnode);
|
||||
dnodeReleaseVnode(pVnode);
|
||||
|
||||
dTrace("table:%s, create table result:%s", pTable->tableId, tstrerror(rpcRsp.code));
|
||||
rpcSendResponse(&rpcRsp);
|
||||
}
|
||||
|
||||
static void dnodeProcessDropTableMsg(SWriteMsg *pMsg) {
|
||||
static void dnodeProcessDropTableMsg(void *pVnode, SWriteMsg *pMsg) {
|
||||
SMDDropTableMsg *pTable = pMsg->rpcMsg.pCont;
|
||||
SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
|
||||
|
||||
|
@ -360,16 +362,16 @@ static void dnodeProcessDropTableMsg(SWriteMsg *pMsg) {
|
|||
.tid = htonl(pTable->sid)
|
||||
};
|
||||
|
||||
void *pTsdb = dnodeGetVnodeTsdb(pMsg->pVnode);
|
||||
void *pTsdb = dnodeGetVnodeTsdb(pVnode);
|
||||
|
||||
rpcRsp.code = tsdbDropTable(pTsdb, tableId);
|
||||
dnodeReleaseVnode(pMsg->pVnode);
|
||||
dnodeReleaseVnode(pVnode);
|
||||
|
||||
dTrace("table:%s, drop table result:%s", pTable->tableId, tstrerror(rpcRsp.code));
|
||||
rpcSendResponse(&rpcRsp);
|
||||
}
|
||||
|
||||
static void dnodeProcessAlterTableMsg(SWriteMsg *pMsg) {
|
||||
static void dnodeProcessAlterTableMsg(void *pVnode, SWriteMsg *pMsg) {
|
||||
SMDCreateTableMsg *pTable = pMsg->rpcMsg.pCont;
|
||||
SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
|
||||
|
||||
|
@ -418,16 +420,16 @@ static void dnodeProcessAlterTableMsg(SWriteMsg *pMsg) {
|
|||
tsdbTableSetTagValue(&tCfg, dataRow, false);
|
||||
}
|
||||
|
||||
void *pTsdb = dnodeGetVnodeTsdb(pMsg->pVnode);
|
||||
void *pTsdb = dnodeGetVnodeTsdb(pVnode);
|
||||
|
||||
rpcRsp.code = tsdbAlterTable(pTsdb, &tCfg);
|
||||
dnodeReleaseVnode(pMsg->pVnode);
|
||||
dnodeReleaseVnode(pVnode);
|
||||
|
||||
dTrace("table:%s, alter table result:%s", pTable->tableId, tstrerror(rpcRsp.code));
|
||||
rpcSendResponse(&rpcRsp);
|
||||
}
|
||||
|
||||
static void dnodeProcessDropStableMsg(SWriteMsg *pMsg) {
|
||||
static void dnodeProcessDropStableMsg(void *pVnode, SWriteMsg *pMsg) {
|
||||
SMDDropSTableMsg *pTable = pMsg->rpcMsg.pCont;
|
||||
SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
|
||||
|
||||
|
@ -439,7 +441,7 @@ static void dnodeProcessDropStableMsg(SWriteMsg *pMsg) {
|
|||
//rpcRsp.code = tsdbDropSTable(pTsdb, pTable->uid);
|
||||
|
||||
rpcRsp.code = TSDB_CODE_SUCCESS;
|
||||
dnodeReleaseVnode(pMsg->pVnode);
|
||||
dnodeReleaseVnode(pVnode);
|
||||
|
||||
dTrace("stable:%s, drop stable result:%s", pTable->tableId, tstrerror(rpcRsp.code));
|
||||
rpcSendResponse(&rpcRsp);
|
||||
|
|
|
@ -176,7 +176,7 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size);
|
|||
#define TSDB_MAX_COLUMNS 256
|
||||
#define TSDB_MIN_COLUMNS 2 //PRIMARY COLUMN(timestamp) + other columns
|
||||
|
||||
#define TSDB_DNODE_NAME_LEN 63
|
||||
#define TSDB_DNODE_NAME_LEN 64
|
||||
#define TSDB_TABLE_NAME_LEN 192
|
||||
#define TSDB_DB_NAME_LEN 32
|
||||
#define TSDB_COL_NAME_LEN 64
|
||||
|
|
|
@ -45,7 +45,8 @@ void * mgmtGetSuperTableVgroup(SSuperTableObj *pStable);
|
|||
int32_t mgmtFindSuperTableTagIndex(SSuperTableObj *pTable, const char *tagName);
|
||||
int32_t mgmtSetSchemaFromSuperTable(SSchema *pSchema, SSuperTableObj *pTable);
|
||||
|
||||
void mgmtDropAllSuperTables(SDbObj *pDropDb);
|
||||
void mgmtDropAllSuperTables(SDbObj *pDropDb);
|
||||
int32_t mgmtExtractTableName(const char* tableId, char* name);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -524,7 +524,7 @@ int32_t mgmtRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows, v
|
|||
}
|
||||
|
||||
memset(stableName, 0, tListLen(stableName));
|
||||
extractTableName(pTable->tableId, stableName);
|
||||
mgmtExtractTableName(pTable->tableId, stableName);
|
||||
|
||||
if (pShow->payloadLen > 0 &&
|
||||
patternMatch(pShow->payload, stableName, TSDB_TABLE_NAME_LEN, &info) != TSDB_PATTERN_MATCH)
|
||||
|
@ -624,3 +624,17 @@ int32_t mgmtGetSuperTableMeta(SDbObj *pDb, SSuperTableObj *pTable, STableMetaMsg
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t mgmtExtractTableName(const char* tableId, char* name) {
|
||||
int pos = -1;
|
||||
int num = 0;
|
||||
for (pos = 0; tableId[pos] != 0; ++pos) {
|
||||
if (tableId[pos] == '.') num++;
|
||||
if (num == 2) break;
|
||||
}
|
||||
|
||||
if (num == 2) {
|
||||
strcpy(name, tableId + pos + 1);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -310,7 +310,7 @@ int32_t mgmtRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows, void *
|
|||
numOfRead++;
|
||||
|
||||
// pattern compare for meter name
|
||||
extractTableName(tableId, tableName);
|
||||
mgmtExtractTableName(tableId, tableName);
|
||||
|
||||
if (pShow->payloadLen > 0 &&
|
||||
patternMatch(pShow->payload, tableName, TSDB_TABLE_NAME_LEN, &info) != TSDB_PATTERN_MATCH) {
|
||||
|
@ -333,7 +333,7 @@ int32_t mgmtRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows, void *
|
|||
|
||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||
if (superTableId != NULL) {
|
||||
extractTableName(superTableId, pWrite);
|
||||
mgmtExtractTableName(superTableId, pWrite);
|
||||
}
|
||||
cols++;
|
||||
|
||||
|
|
|
@ -28,10 +28,13 @@ void *qhandle = NULL;
|
|||
void processShellMsg() {
|
||||
static int num = 0;
|
||||
taos_qall qall;
|
||||
SRpcMsg rpcMsg;
|
||||
SRpcMsg *pRpcMsg, rpcMsg;
|
||||
int type;
|
||||
|
||||
qall = taosAllocateQall();
|
||||
|
||||
while (1) {
|
||||
int numOfMsgs = taosReadAllQitems(qhandle, &qall);
|
||||
int numOfMsgs = taosReadAllQitems(qhandle, qall);
|
||||
if (numOfMsgs <= 0) {
|
||||
usleep(1000);
|
||||
continue;
|
||||
|
@ -40,10 +43,10 @@ void processShellMsg() {
|
|||
tTrace("%d shell msgs are received", numOfMsgs);
|
||||
|
||||
for (int i=0; i<numOfMsgs; ++i) {
|
||||
taosGetQitem(qall, &rpcMsg);
|
||||
taosGetQitem(qall, &type, (void **)&pRpcMsg);
|
||||
|
||||
if (dataFd >=0) {
|
||||
if ( write(dataFd, rpcMsg.pCont, rpcMsg.contLen) <0 ) {
|
||||
if ( write(dataFd, pRpcMsg->pCont, pRpcMsg->contLen) <0 ) {
|
||||
tPrint("failed to write data file, reason:%s", strerror(errno));
|
||||
}
|
||||
}
|
||||
|
@ -62,19 +65,22 @@ void processShellMsg() {
|
|||
|
||||
taosResetQitems(qall);
|
||||
for (int i=0; i<numOfMsgs; ++i) {
|
||||
taosGetQitem(qall, &rpcMsg);
|
||||
|
||||
rpcFreeCont(rpcMsg.pCont);
|
||||
taosGetQitem(qall, &type, (void **)&pRpcMsg);
|
||||
rpcFreeCont(pRpcMsg->pCont);
|
||||
|
||||
rpcMsg.pCont = rpcMallocCont(msgSize);
|
||||
rpcMsg.contLen = msgSize;
|
||||
rpcMsg.handle = rpcMsg.handle;
|
||||
rpcMsg.handle = pRpcMsg->handle;
|
||||
rpcMsg.code = 1;
|
||||
rpcSendResponse(&rpcMsg);
|
||||
|
||||
taosFreeQitem(pRpcMsg);
|
||||
}
|
||||
|
||||
taosFreeQitems(qall);
|
||||
}
|
||||
|
||||
taosFreeQall(qall);
|
||||
/*
|
||||
SRpcIpSet ipSet;
|
||||
ipSet.numOfIps = 1;
|
||||
|
@ -108,8 +114,13 @@ int retrieveAuthInfo(char *meterId, char *spi, char *encrypt, char *secret, char
|
|||
}
|
||||
|
||||
void processRequestMsg(SRpcMsg *pMsg) {
|
||||
tTrace("request is received, type:%d, contLen:%d", pMsg->msgType, pMsg->contLen);
|
||||
taosWriteQitem(qhandle, pMsg);
|
||||
SRpcMsg *pTemp;
|
||||
|
||||
pTemp = taosAllocateQitem(sizeof(SRpcMsg));
|
||||
memcpy(pTemp, pMsg, sizeof(SRpcMsg));
|
||||
|
||||
tTrace("request is received, type:%d, contLen:%d, item:%p", pMsg->msgType, pMsg->contLen, pTemp);
|
||||
taosWriteQitem(qhandle, TAOS_QTYPE_RPC, pTemp);
|
||||
}
|
||||
|
||||
int main(int argc, char *argv[]) {
|
||||
|
@ -143,6 +154,7 @@ int main(int argc, char *argv[]) {
|
|||
commit = atoi(argv[++i]);
|
||||
} else if (strcmp(argv[i], "-d")==0 && i < argc-1) {
|
||||
rpcDebugFlag = atoi(argv[++i]);
|
||||
ddebugFlag = rpcDebugFlag;
|
||||
uDebugFlag = rpcDebugFlag;
|
||||
} else {
|
||||
printf("\nusage: %s [options] \n", argv[0]);
|
||||
|
|
|
@ -78,7 +78,7 @@ void taosResetLogFile();
|
|||
// utility log function
|
||||
#define pError(...) \
|
||||
if (uDebugFlag & DEBUG_ERROR) { \
|
||||
tprintf("ERROR UTL ", 255, __VA_ARGS__); \
|
||||
tprintf("ERROR UTL ", uDebugFlag, __VA_ARGS__); \
|
||||
}
|
||||
#define pWarn(...) \
|
||||
if (uDebugFlag & DEBUG_WARN) { \
|
||||
|
|
|
@ -20,28 +20,35 @@
|
|||
extern "C" {
|
||||
#endif
|
||||
|
||||
#define TAOS_QTYPE_RPC 0
|
||||
#define TAOS_QTYPE_FWD 1
|
||||
#define TAOS_QTYPE_WAL 2
|
||||
|
||||
typedef void* taos_queue;
|
||||
typedef void* taos_qset;
|
||||
typedef void* taos_qall;
|
||||
|
||||
taos_queue taosOpenQueue(int itemSize);
|
||||
taos_queue taosOpenQueue();
|
||||
void taosCloseQueue(taos_queue);
|
||||
int taosWriteQitem(taos_queue, void *item);
|
||||
int taosReadQitem(taos_queue, void *item);
|
||||
void *taosAllocateQitem(int size);
|
||||
void taosFreeQitem(void *item);
|
||||
int taosWriteQitem(taos_queue, int type, void *item);
|
||||
int taosReadQitem(taos_queue, int *type, void **pitem);
|
||||
|
||||
int taosReadAllQitems(taos_queue, taos_qall *);
|
||||
int taosGetQitem(taos_qall, void *item);
|
||||
taos_qall taosAllocateQall();
|
||||
void taosFreeQall(taos_qall);
|
||||
int taosReadAllQitems(taos_queue, taos_qall);
|
||||
int taosGetQitem(taos_qall, int *type, void **pitem);
|
||||
void taosResetQitems(taos_qall);
|
||||
void taosFreeQitems(taos_qall);
|
||||
|
||||
taos_qset taosOpenQset();
|
||||
void taosCloseQset();
|
||||
int taosAddIntoQset(taos_qset, taos_queue);
|
||||
int taosAddIntoQset(taos_qset, taos_queue, void *ahandle);
|
||||
void taosRemoveFromQset(taos_qset, taos_queue);
|
||||
int taosGetQueueNumber(taos_qset);
|
||||
|
||||
int taosReadQitemFromQset(taos_qset, void *item);
|
||||
int taosReadAllQitemsFromQset(taos_qset, taos_qall *);
|
||||
int taosReadQitemFromQset(taos_qset, int *type, void **pitem, void **handle);
|
||||
int taosReadAllQitemsFromQset(taos_qset, taos_qall, void **handle);
|
||||
|
||||
int taosGetQueueItemsNumber(taos_queue param);
|
||||
int taosGetQsetItemsNumber(taos_qset param);
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
#include "tqueue.h"
|
||||
|
||||
typedef struct _taos_qnode {
|
||||
int type;
|
||||
struct _taos_qnode *next;
|
||||
char item[];
|
||||
} STaosQnode;
|
||||
|
@ -30,6 +31,7 @@ typedef struct _taos_q {
|
|||
struct _taos_qnode *tail;
|
||||
struct _taos_q *next; // for queue set
|
||||
struct _taos_qset *qset; // for queue set
|
||||
void *ahandle; // for queue set
|
||||
pthread_mutex_t mutex;
|
||||
} STaosQueue;
|
||||
|
||||
|
@ -48,7 +50,7 @@ typedef struct _taos_qall {
|
|||
int32_t numOfItems;
|
||||
} STaosQall;
|
||||
|
||||
taos_queue taosOpenQueue(int itemSize) {
|
||||
taos_queue taosOpenQueue() {
|
||||
|
||||
STaosQueue *queue = (STaosQueue *) calloc(sizeof(STaosQueue), 1);
|
||||
if (queue == NULL) {
|
||||
|
@ -57,8 +59,6 @@ taos_queue taosOpenQueue(int itemSize) {
|
|||
}
|
||||
|
||||
pthread_mutex_init(&queue->mutex, NULL);
|
||||
queue->itemSize = (int32_t)itemSize;
|
||||
|
||||
return queue;
|
||||
}
|
||||
|
||||
|
@ -83,16 +83,26 @@ void taosCloseQueue(taos_queue param) {
|
|||
free(queue);
|
||||
}
|
||||
|
||||
int taosWriteQitem(taos_queue param, void *item) {
|
||||
void *taosAllocateQitem(int size) {
|
||||
STaosQnode *pNode = (STaosQnode *)calloc(sizeof(STaosQnode) + size, 1);
|
||||
if (pNode == NULL) return NULL;
|
||||
return (void *)pNode->item;
|
||||
}
|
||||
|
||||
void taosFreeQitem(void *param) {
|
||||
if (param == NULL) return;
|
||||
|
||||
//pTrace("item:%p is freed", param);
|
||||
|
||||
char *temp = (char *)param;
|
||||
temp -= sizeof(STaosQnode);
|
||||
free(temp);
|
||||
}
|
||||
|
||||
int taosWriteQitem(taos_queue param, int type, void *item) {
|
||||
STaosQueue *queue = (STaosQueue *)param;
|
||||
|
||||
STaosQnode *pNode = (STaosQnode *)calloc(sizeof(STaosQnode) + queue->itemSize, 1);
|
||||
if ( pNode == NULL ) {
|
||||
terrno = TSDB_CODE_NO_RESOURCE;
|
||||
return -1;
|
||||
}
|
||||
|
||||
memcpy(pNode->item, item, queue->itemSize);
|
||||
STaosQnode *pNode = (STaosQnode *)(((char *)item) - sizeof(STaosQnode));
|
||||
pNode->type = type;
|
||||
|
||||
pthread_mutex_lock(&queue->mutex);
|
||||
|
||||
|
@ -107,12 +117,14 @@ int taosWriteQitem(taos_queue param, void *item) {
|
|||
queue->numOfItems++;
|
||||
if (queue->qset) atomic_add_fetch_32(&queue->qset->numOfItems, 1);
|
||||
|
||||
//pTrace("item:%p is put into queue, items:%d", item, queue->numOfItems);
|
||||
|
||||
pthread_mutex_unlock(&queue->mutex);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int taosReadQitem(taos_queue param, void *item) {
|
||||
int taosReadQitem(taos_queue param, int *type, void **pitem) {
|
||||
STaosQueue *queue = (STaosQueue *)param;
|
||||
STaosQnode *pNode = NULL;
|
||||
int code = 0;
|
||||
|
@ -121,14 +133,15 @@ int taosReadQitem(taos_queue param, void *item) {
|
|||
|
||||
if (queue->head) {
|
||||
pNode = queue->head;
|
||||
memcpy(item, pNode->item, queue->itemSize);
|
||||
*pitem = pNode->item;
|
||||
*type = pNode->type;
|
||||
queue->head = pNode->next;
|
||||
if (queue->head == NULL)
|
||||
queue->tail = NULL;
|
||||
free(pNode);
|
||||
queue->numOfItems--;
|
||||
if (queue->qset) atomic_sub_fetch_32(&queue->qset->numOfItems, 1);
|
||||
code = 1;
|
||||
//pTrace("item:%p is read out from queue, items:%d", *pitem, queue->numOfItems);
|
||||
}
|
||||
|
||||
pthread_mutex_unlock(&queue->mutex);
|
||||
|
@ -136,39 +149,42 @@ int taosReadQitem(taos_queue param, void *item) {
|
|||
return code;
|
||||
}
|
||||
|
||||
int taosReadAllQitems(taos_queue param, taos_qall *res) {
|
||||
void *taosAllocateQall() {
|
||||
void *p = malloc(sizeof(STaosQall));
|
||||
return p;
|
||||
}
|
||||
|
||||
void taosFreeQall(void *param) {
|
||||
free(param);
|
||||
}
|
||||
|
||||
int taosReadAllQitems(taos_queue param, taos_qall p2) {
|
||||
STaosQueue *queue = (STaosQueue *)param;
|
||||
STaosQall *qall = NULL;
|
||||
STaosQall *qall = (STaosQall *)p2;
|
||||
int code = 0;
|
||||
|
||||
pthread_mutex_lock(&queue->mutex);
|
||||
|
||||
if (queue->head) {
|
||||
qall = (STaosQall *) calloc(sizeof(STaosQall), 1);
|
||||
if ( qall == NULL ) {
|
||||
terrno = TSDB_CODE_NO_RESOURCE;
|
||||
code = -1;
|
||||
} else {
|
||||
qall->current = queue->head;
|
||||
qall->start = queue->head;
|
||||
qall->numOfItems = queue->numOfItems;
|
||||
qall->itemSize = queue->itemSize;
|
||||
code = qall->numOfItems;
|
||||
memset(qall, 0, sizeof(STaosQall));
|
||||
qall->current = queue->head;
|
||||
qall->start = queue->head;
|
||||
qall->numOfItems = queue->numOfItems;
|
||||
qall->itemSize = queue->itemSize;
|
||||
code = qall->numOfItems;
|
||||
|
||||
queue->head = NULL;
|
||||
queue->tail = NULL;
|
||||
queue->numOfItems = 0;
|
||||
if (queue->qset) atomic_sub_fetch_32(&queue->qset->numOfItems, qall->numOfItems);
|
||||
}
|
||||
queue->head = NULL;
|
||||
queue->tail = NULL;
|
||||
queue->numOfItems = 0;
|
||||
if (queue->qset) atomic_sub_fetch_32(&queue->qset->numOfItems, qall->numOfItems);
|
||||
}
|
||||
|
||||
pthread_mutex_unlock(&queue->mutex);
|
||||
|
||||
*res = qall;
|
||||
return code;
|
||||
}
|
||||
|
||||
int taosGetQitem(taos_qall param, void *item) {
|
||||
int taosGetQitem(taos_qall param, int *type, void **pitem) {
|
||||
STaosQall *qall = (STaosQall *)param;
|
||||
STaosQnode *pNode;
|
||||
int num = 0;
|
||||
|
@ -178,8 +194,10 @@ int taosGetQitem(taos_qall param, void *item) {
|
|||
qall->current = pNode->next;
|
||||
|
||||
if (pNode) {
|
||||
memcpy(item, pNode->item, qall->itemSize);
|
||||
*pitem = pNode->item;
|
||||
*type = pNode->type;
|
||||
num = 1;
|
||||
//pTrace("item:%p is fetched", *pitem);
|
||||
}
|
||||
|
||||
return num;
|
||||
|
@ -190,19 +208,6 @@ void taosResetQitems(taos_qall param) {
|
|||
qall->current = qall->start;
|
||||
}
|
||||
|
||||
void taosFreeQitems(taos_qall param) {
|
||||
STaosQall *qall = (STaosQall *)param;
|
||||
STaosQnode *pNode;
|
||||
|
||||
while (qall->current) {
|
||||
pNode = qall->current;
|
||||
qall->current = pNode->next;
|
||||
free(pNode);
|
||||
}
|
||||
|
||||
free(qall);
|
||||
}
|
||||
|
||||
taos_qset taosOpenQset() {
|
||||
|
||||
STaosQset *qset = (STaosQset *) calloc(sizeof(STaosQset), 1);
|
||||
|
@ -221,7 +226,7 @@ void taosCloseQset(taos_qset param) {
|
|||
free(qset);
|
||||
}
|
||||
|
||||
int taosAddIntoQset(taos_qset p1, taos_queue p2) {
|
||||
int taosAddIntoQset(taos_qset p1, taos_queue p2, void *ahandle) {
|
||||
STaosQueue *queue = (STaosQueue *)p2;
|
||||
STaosQset *qset = (STaosQset *)p1;
|
||||
|
||||
|
@ -230,6 +235,7 @@ int taosAddIntoQset(taos_qset p1, taos_queue p2) {
|
|||
pthread_mutex_lock(&qset->mutex);
|
||||
|
||||
queue->next = qset->head;
|
||||
queue->ahandle = ahandle;
|
||||
qset->head = queue;
|
||||
qset->numOfQueues++;
|
||||
|
||||
|
@ -283,7 +289,7 @@ int taosGetQueueNumber(taos_qset param) {
|
|||
return ((STaosQset *)param)->numOfQueues;
|
||||
}
|
||||
|
||||
int taosReadQitemFromQset(taos_qset param, void *item) {
|
||||
int taosReadQitemFromQset(taos_qset param, int *type, void **pitem, void **phandle) {
|
||||
STaosQset *qset = (STaosQset *)param;
|
||||
STaosQnode *pNode = NULL;
|
||||
int code = 0;
|
||||
|
@ -301,11 +307,12 @@ int taosReadQitemFromQset(taos_qset param, void *item) {
|
|||
|
||||
if (queue->head) {
|
||||
pNode = queue->head;
|
||||
memcpy(item, pNode->item, queue->itemSize);
|
||||
*pitem = pNode->item;
|
||||
*type = pNode->type;
|
||||
*phandle = queue->ahandle;
|
||||
queue->head = pNode->next;
|
||||
if (queue->head == NULL)
|
||||
queue->tail = NULL;
|
||||
free(pNode);
|
||||
queue->numOfItems--;
|
||||
atomic_sub_fetch_32(&qset->numOfItems, 1);
|
||||
code = 1;
|
||||
|
@ -318,10 +325,10 @@ int taosReadQitemFromQset(taos_qset param, void *item) {
|
|||
return code;
|
||||
}
|
||||
|
||||
int taosReadAllQitemsFromQset(taos_qset param, taos_qall *res) {
|
||||
int taosReadAllQitemsFromQset(taos_qset param, taos_qall p2, void **phandle) {
|
||||
STaosQset *qset = (STaosQset *)param;
|
||||
STaosQueue *queue;
|
||||
STaosQall *qall = NULL;
|
||||
STaosQall *qall = (STaosQall *)p2;
|
||||
int code = 0;
|
||||
|
||||
for(int i=0; i<qset->numOfQueues; ++i) {
|
||||
|
@ -336,22 +343,17 @@ int taosReadAllQitemsFromQset(taos_qset param, taos_qall *res) {
|
|||
pthread_mutex_lock(&queue->mutex);
|
||||
|
||||
if (queue->head) {
|
||||
qall = (STaosQall *) calloc(sizeof(STaosQall), 1);
|
||||
if (qall == NULL) {
|
||||
terrno = TSDB_CODE_NO_RESOURCE;
|
||||
code = -1;
|
||||
} else {
|
||||
qall->current = queue->head;
|
||||
qall->start = queue->head;
|
||||
qall->numOfItems = queue->numOfItems;
|
||||
qall->itemSize = queue->itemSize;
|
||||
code = qall->numOfItems;
|
||||
qall->current = queue->head;
|
||||
qall->start = queue->head;
|
||||
qall->numOfItems = queue->numOfItems;
|
||||
qall->itemSize = queue->itemSize;
|
||||
code = qall->numOfItems;
|
||||
*phandle = queue->ahandle;
|
||||
|
||||
queue->head = NULL;
|
||||
queue->tail = NULL;
|
||||
queue->numOfItems = 0;
|
||||
atomic_sub_fetch_32(&qset->numOfItems, qall->numOfItems);
|
||||
}
|
||||
queue->head = NULL;
|
||||
queue->tail = NULL;
|
||||
queue->numOfItems = 0;
|
||||
atomic_sub_fetch_32(&qset->numOfItems, qall->numOfItems);
|
||||
}
|
||||
|
||||
pthread_mutex_unlock(&queue->mutex);
|
||||
|
@ -359,8 +361,6 @@ int taosReadAllQitemsFromQset(taos_qset param, taos_qall *res) {
|
|||
if (code != 0) break;
|
||||
}
|
||||
|
||||
*res = qall;
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
|
|
|
@ -330,7 +330,7 @@ int taosOpenTcpClientSocket(char *destIp, uint16_t destPort, char *clientIp) {
|
|||
struct sockaddr_in serverAddr, clientAddr;
|
||||
int ret;
|
||||
|
||||
pTrace("open tcp client socket:%s:%d", destIp, destPort);
|
||||
// pTrace("open tcp client socket:%s:%d, local Ip:%s", destIp, destPort, clientIp);
|
||||
|
||||
sockFd = (int)socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
|
||||
|
||||
|
@ -362,7 +362,7 @@ int taosOpenTcpClientSocket(char *destIp, uint16_t destPort, char *clientIp) {
|
|||
ret = connect(sockFd, (struct sockaddr *)&serverAddr, sizeof(serverAddr));
|
||||
|
||||
if (ret != 0) {
|
||||
pError("failed to connect socket, ip:%s, port:%hu, reason: %s", destIp, destPort, strerror(errno));
|
||||
//pError("failed to connect socket, ip:%s, port:%hu, reason: %s", destIp, destPort, strerror(errno));
|
||||
taosCloseSocket(sockFd);
|
||||
sockFd = -1;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue