roll back the rserver.c

This commit is contained in:
Jeff Tao 2020-03-27 21:39:11 +08:00
parent 4c60448baf
commit e485bb24b2
1 changed files with 22 additions and 13 deletions

View File

@ -28,23 +28,25 @@ void *qhandle = NULL;
void processShellMsg() { void processShellMsg() {
static int num = 0; static int num = 0;
taos_qall qall; taos_qall qall;
SRpcMsg rpcMsg; SRpcMsg *pRpcMsg, rpcMsg;
int type;
qall = taosAllocateQall();
while (1) { while (1) {
int numOfMsgs = taosReadAllQitems(qhandle, &qall); int numOfMsgs = taosReadAllQitems(qhandle, qall);
if (numOfMsgs <= 0) { if (numOfMsgs <= 0) {
usleep(1000); usleep(1000);
continue; continue;
} }
tTrace("%d shell msgs are received", numOfMsgs); tTrace("%d shell msgs are received", numOfMsgs);
sleep(5);
for (int i=0; i<numOfMsgs; ++i) { for (int i=0; i<numOfMsgs; ++i) {
taosGetQitem(qall, &rpcMsg); taosGetQitem(qall, &type, (void **)&pRpcMsg);
if (dataFd >=0) { if (dataFd >=0) {
if ( write(dataFd, rpcMsg.pCont, rpcMsg.contLen) <0 ) { if ( write(dataFd, pRpcMsg->pCont, pRpcMsg->contLen) <0 ) {
tPrint("failed to write data file, reason:%s", strerror(errno)); tPrint("failed to write data file, reason:%s", strerror(errno));
} }
} }
@ -63,19 +65,22 @@ void processShellMsg() {
taosResetQitems(qall); taosResetQitems(qall);
for (int i=0; i<numOfMsgs; ++i) { for (int i=0; i<numOfMsgs; ++i) {
taosGetQitem(qall, &rpcMsg);
rpcFreeCont(rpcMsg.pCont); taosGetQitem(qall, &type, (void **)&pRpcMsg);
rpcFreeCont(pRpcMsg->pCont);
rpcMsg.pCont = rpcMallocCont(msgSize); rpcMsg.pCont = rpcMallocCont(msgSize);
rpcMsg.contLen = msgSize; rpcMsg.contLen = msgSize;
rpcMsg.handle = rpcMsg.handle; rpcMsg.handle = pRpcMsg->handle;
rpcMsg.code = 1; rpcMsg.code = 1;
rpcSendResponse(&rpcMsg); rpcSendResponse(&rpcMsg);
taosFreeQitem(pRpcMsg);
} }
taosFreeQitems(qall);
} }
taosFreeQall(qall);
/* /*
SRpcIpSet ipSet; SRpcIpSet ipSet;
ipSet.numOfIps = 1; ipSet.numOfIps = 1;
@ -109,8 +114,13 @@ int retrieveAuthInfo(char *meterId, char *spi, char *encrypt, char *secret, char
} }
void processRequestMsg(SRpcMsg *pMsg) { void processRequestMsg(SRpcMsg *pMsg) {
tTrace("request is received, type:%d, contLen:%d", pMsg->msgType, pMsg->contLen); SRpcMsg *pTemp;
taosWriteQitem(qhandle, pMsg);
pTemp = taosAllocateQitem(sizeof(SRpcMsg));
memcpy(pTemp, pMsg, sizeof(SRpcMsg));
tTrace("request is received, type:%d, contLen:%d, item:%p", pMsg->msgType, pMsg->contLen, pTemp);
taosWriteQitem(qhandle, TAOS_QTYPE_RPC, pTemp);
} }
int main(int argc, char *argv[]) { int main(int argc, char *argv[]) {
@ -145,6 +155,7 @@ int main(int argc, char *argv[]) {
commit = atoi(argv[++i]); commit = atoi(argv[++i]);
} else if (strcmp(argv[i], "-d")==0 && i < argc-1) { } else if (strcmp(argv[i], "-d")==0 && i < argc-1) {
rpcDebugFlag = atoi(argv[++i]); rpcDebugFlag = atoi(argv[++i]);
ddebugFlag = rpcDebugFlag;
uDebugFlag = rpcDebugFlag; uDebugFlag = rpcDebugFlag;
} else { } else {
printf("\nusage: %s [options] \n", argv[0]); printf("\nusage: %s [options] \n", argv[0]);
@ -191,5 +202,3 @@ int main(int argc, char *argv[]) {
return 0; return 0;
} }