adjust tqueue codes

This commit is contained in:
Shengliang Guan 2021-10-29 17:11:15 +08:00
parent 117d2ce4de
commit 139a4c7fb1
10 changed files with 339 additions and 370 deletions

View File

@ -37,21 +37,24 @@ shall be used to set up the protection.
*/ */
typedef void* taos_queue; typedef void *taos_queue;
typedef void* taos_qset; typedef void *taos_qset;
typedef void* taos_qall; typedef void *taos_qall;
typedef void *(*FProcessOneItem)(void *pItem, void *ahandle);
typedef void *(*FProcessAllItem)(taos_qall qall, int numOfItems, void *ahandle);
taos_queue taosOpenQueue(); taos_queue taosOpenQueue();
void taosCloseQueue(taos_queue); void taosCloseQueue(taos_queue);
void taosSetQueueFp(taos_queue, FProcessOneItem, FProcessAllItem fp);
void *taosAllocateQitem(int size); void *taosAllocateQitem(int size);
void taosFreeQitem(void *item); void taosFreeQitem(void *pItem);
int taosWriteQitem(taos_queue, int type, void *item); int taosWriteQitem(taos_queue, void *pItem);
int taosReadQitem(taos_queue, int *type, void **pitem); int taosReadQitem(taos_queue, void **pItem);
taos_qall taosAllocateQall(); taos_qall taosAllocateQall();
void taosFreeQall(taos_qall); void taosFreeQall(taos_qall);
int taosReadAllQitems(taos_queue, taos_qall); int taosReadAllQitems(taos_queue, taos_qall);
int taosGetQitem(taos_qall, int *type, void **pitem); int taosGetQitem(taos_qall, void **pItem);
void taosResetQitems(taos_qall); void taosResetQitems(taos_qall);
taos_qset taosOpenQset(); taos_qset taosOpenQset();
@ -61,8 +64,8 @@ int taosAddIntoQset(taos_qset, taos_queue, void *ahandle);
void taosRemoveFromQset(taos_qset, taos_queue); void taosRemoveFromQset(taos_qset, taos_queue);
int taosGetQueueNumber(taos_qset); int taosGetQueueNumber(taos_qset);
int taosReadQitemFromQset(taos_qset, int *type, void **pitem, void **handle); int taosReadQitemFromQset(taos_qset, void **pItem, void **ahandle, FProcessOneItem *);
int taosReadAllQitemsFromQset(taos_qset, taos_qall, void **handle); int taosReadAllQitemsFromQset(taos_qset, taos_qall, void **ahandle, FProcessAllItem *);
int taosGetQueueItemsNumber(taos_queue param); int taosGetQueueItemsNumber(taos_queue param);
int taosGetQsetItemsNumber(taos_qset param); int taosGetQsetItemsNumber(taos_qset param);

View File

@ -22,13 +22,6 @@
extern "C" { extern "C" {
#endif #endif
typedef int32_t (*ProcessStartFp)(void *ahandle, void *pMsg, int32_t qtype);
typedef void (*ProcessEndFp)(void *ahandle, void *pMsg, int32_t qtype, int32_t code);
typedef bool (*ProcessWriteStartFp)(void *ahandle, void *pMsg, int32_t qtype);
typedef void (*ProcessWriteSyncFp)(void *ahandle, int32_t code);
typedef void (*ProcessWriteEndFp)(void *ahandle, void *pMsg, int32_t qtype);
typedef struct SWorker { typedef struct SWorker {
int32_t id; // worker ID int32_t id; // worker ID
pthread_t thread; // thread pthread_t thread; // thread
@ -40,41 +33,36 @@ typedef struct SWorkerPool {
int32_t min; // min number of workers int32_t min; // min number of workers
int32_t num; // current number of workers int32_t num; // current number of workers
taos_qset qset; taos_qset qset;
const char * name; const char *name;
ProcessStartFp startFp; SWorker *workers;
ProcessEndFp endFp;
SWorker * workers;
pthread_mutex_t mutex; pthread_mutex_t mutex;
} SWorkerPool; } SWorkerPool;
typedef struct SWriteWorker { typedef struct SMWorker {
int32_t id; // worker id int32_t id; // worker id
pthread_t thread; // thread pthread_t thread; // thread
taos_qall qall; taos_qall qall;
taos_qset qset; // queue set taos_qset qset; // queue set
struct SWriteWorkerPool *pool; struct SMWorkerPool *pool;
} SWriteWorker; } SMWorker;
typedef struct SWriteWorkerPool { typedef struct SMWorkerPool {
int32_t max; // max number of workers int32_t max; // max number of workers
int32_t nextId; // from 0 to max-1, cyclic int32_t nextId; // from 0 to max-1, cyclic
const char * name; const char *name;
ProcessWriteStartFp startFp; SMWorker *workers;
ProcessWriteSyncFp syncFp; pthread_mutex_t mutex;
ProcessWriteEndFp endFp; } SMWorkerPool;
SWriteWorker * workers;
pthread_mutex_t mutex;
} SWriteWorkerPool;
int32_t tWorkerInit(SWorkerPool *pool); int32_t tWorkerInit(SWorkerPool *pool);
void tWorkerCleanup(SWorkerPool *pool); void tWorkerCleanup(SWorkerPool *pool);
taos_queue tWorkerAllocQueue(SWorkerPool *pool, void *ahandle); taos_queue tWorkerAllocQueue(SWorkerPool *pool, void *ahandle, FProcessOneItem fp);
void tWorkerFreeQueue(SWorkerPool *pool, taos_queue queue); void tWorkerFreeQueue(SWorkerPool *pool, taos_queue queue);
int32_t tWriteWorkerInit(SWriteWorkerPool *pool); int32_t tMWorkerInit(SMWorkerPool *pool);
void tWriteWorkerCleanup(SWriteWorkerPool *pool); void tMWorkerCleanup(SMWorkerPool *pool);
taos_queue tWriteWorkerAllocQueue(SWriteWorkerPool *pool, void *ahandle); taos_queue tMWorkerAllocQueue(SMWorkerPool *pool, void *ahandle, FProcessAllItem fp);
void tWriteWorkerFreeQueue(SWriteWorkerPool *pool, taos_queue queue); void tMWorkerFreeQueue(SMWorkerPool *pool, taos_queue queue);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -28,10 +28,10 @@ static struct {
SWorkerPool write; SWorkerPool write;
SWorkerPool peerReq; SWorkerPool peerReq;
SWorkerPool peerRsp; SWorkerPool peerRsp;
taos_queue readQ; taos_queue readQ;
taos_queue writeQ; taos_queue writeQ;
taos_queue peerReqQ; taos_queue peerReqQ;
taos_queue peerRspQ; taos_queue peerRspQ;
int32_t (*writeMsgFp[TSDB_MSG_TYPE_MAX])(SMnMsg *); int32_t (*writeMsgFp[TSDB_MSG_TYPE_MAX])(SMnMsg *);
int32_t (*readMsgFp[TSDB_MSG_TYPE_MAX])(SMnMsg *); int32_t (*readMsgFp[TSDB_MSG_TYPE_MAX])(SMnMsg *);
int32_t (*peerReqFp[TSDB_MSG_TYPE_MAX])(SMnMsg *); int32_t (*peerReqFp[TSDB_MSG_TYPE_MAX])(SMnMsg *);
@ -81,7 +81,7 @@ static void mnodeDispatchToWriteQueue(SRpcMsg *pRpcMsg) {
rpcSendResponse(&rpcRsp); rpcSendResponse(&rpcRsp);
} else { } else {
mTrace("msg:%p, app:%p type:%s is put into wqueue", pMsg, pMsg->rpcMsg.ahandle, taosMsg[pMsg->rpcMsg.msgType]); mTrace("msg:%p, app:%p type:%s is put into wqueue", pMsg, pMsg->rpcMsg.ahandle, taosMsg[pMsg->rpcMsg.msgType]);
taosWriteQitem(tsMworker.writeQ, TAOS_QTYPE_RPC, pMsg); taosWriteQitem(tsMworker.writeQ, pMsg);
} }
} }
@ -93,7 +93,7 @@ void mnodeReDispatchToWriteQueue(SMnMsg *pMsg) {
mnodeSendRedirectMsg(&pMsg->rpcMsg, true); mnodeSendRedirectMsg(&pMsg->rpcMsg, true);
mnodeCleanupMsg(pMsg); mnodeCleanupMsg(pMsg);
} else { } else {
taosWriteQitem(tsMworker.writeQ, TAOS_QTYPE_RPC, pMsg); taosWriteQitem(tsMworker.writeQ, pMsg);
} }
} }
@ -107,7 +107,7 @@ static void mnodeDispatchToReadQueue(SRpcMsg *pRpcMsg) {
rpcSendResponse(&rpcRsp); rpcSendResponse(&rpcRsp);
} else { } else {
mTrace("msg:%p, app:%p type:%s is put into rqueue", pMsg, pMsg->rpcMsg.ahandle, taosMsg[pMsg->rpcMsg.msgType]); mTrace("msg:%p, app:%p type:%s is put into rqueue", pMsg, pMsg->rpcMsg.ahandle, taosMsg[pMsg->rpcMsg.msgType]);
taosWriteQitem(tsMworker.readQ, TAOS_QTYPE_RPC, pMsg); taosWriteQitem(tsMworker.readQ, pMsg);
} }
} }
@ -125,7 +125,7 @@ static void mnodeDispatchToPeerQueue(SRpcMsg *pRpcMsg) {
} else { } else {
mTrace("msg:%p, app:%p type:%s is put into peer req queue", pMsg, pMsg->rpcMsg.ahandle, mTrace("msg:%p, app:%p type:%s is put into peer req queue", pMsg, pMsg->rpcMsg.ahandle,
taosMsg[pMsg->rpcMsg.msgType]); taosMsg[pMsg->rpcMsg.msgType]);
taosWriteQitem(tsMworker.peerReqQ, TAOS_QTYPE_RPC, pMsg); taosWriteQitem(tsMworker.peerReqQ, pMsg);
} }
} }
@ -140,13 +140,13 @@ void mnodeDispatchToPeerRspQueue(SRpcMsg *pRpcMsg) {
} else { } else {
mTrace("msg:%p, app:%p type:%s is put into peer rsp queue", pMsg, pMsg->rpcMsg.ahandle, mTrace("msg:%p, app:%p type:%s is put into peer rsp queue", pMsg, pMsg->rpcMsg.ahandle,
taosMsg[pMsg->rpcMsg.msgType]); taosMsg[pMsg->rpcMsg.msgType]);
taosWriteQitem(tsMworker.peerRspQ, TAOS_QTYPE_RPC, pMsg); taosWriteQitem(tsMworker.peerRspQ, pMsg);
} }
// rpcFreeCont(pRpcMsg->pCont); // rpcFreeCont(pRpcMsg->pCont);
} }
static void mnodeSendRpcRsp(void *ahandle, SMnMsg *pMsg, int32_t qtype, int32_t code) { void mnodeSendRsp(SMnMsg *pMsg, int32_t code) {
if (pMsg == NULL) return; if (pMsg == NULL) return;
if (code == TSDB_CODE_MND_ACTION_IN_PROGRESS) return; if (code == TSDB_CODE_MND_ACTION_IN_PROGRESS) return;
if (code == TSDB_CODE_MND_ACTION_NEED_REPROCESSED) { if (code == TSDB_CODE_MND_ACTION_NEED_REPROCESSED) {
@ -155,22 +155,16 @@ static void mnodeSendRpcRsp(void *ahandle, SMnMsg *pMsg, int32_t qtype, int32_t
} }
SRpcMsg rpcRsp = { SRpcMsg rpcRsp = {
.handle = pMsg->rpcMsg.handle, .handle = pMsg->rpcMsg.handle,
.pCont = pMsg->rpcRsp.rsp, .pCont = pMsg->rpcRsp.rsp,
.contLen = pMsg->rpcRsp.len, .contLen = pMsg->rpcRsp.len,
.code = code, .code = code,
}; };
rpcSendResponse(&rpcRsp); rpcSendResponse(&rpcRsp);
mnodeCleanupMsg(pMsg); mnodeCleanupMsg(pMsg);
} }
void mnodeSendRsp(SMnMsg *pMsg, int32_t code) { mnodeSendRpcRsp(NULL, pMsg, 0, code); }
static void mnodeProcessPeerRspEnd(void *ahandle, SMnMsg *pMsg, int32_t qtype, int32_t code) {
mnodeCleanupMsg(pMsg);
}
static void mnodeInitMsgFp() { static void mnodeInitMsgFp() {
// // peer req // // peer req
// tsMworker.msgFp[TSDB_MSG_TYPE_DM_CONFIG_TABLE] = mnodeDispatchToPeerQueue; // tsMworker.msgFp[TSDB_MSG_TYPE_DM_CONFIG_TABLE] = mnodeDispatchToPeerQueue;
@ -290,13 +284,15 @@ static void mnodeInitMsgFp() {
// tsMworker.writeMsgFp[TSDB_MSG_TYPE_CM_KILL_CONN] = mnodeProcessKillConnectionMsg; // tsMworker.writeMsgFp[TSDB_MSG_TYPE_CM_KILL_CONN] = mnodeProcessKillConnectionMsg;
} }
static int32_t mnodeProcessWriteReq(void *unused, SMnMsg *pMsg, int32_t qtype) { static void mnodeProcessWriteReq(SMnMsg *pMsg, void *unused) {
int32_t msgType = pMsg->rpcMsg.msgType; int32_t msgType = pMsg->rpcMsg.msgType;
void *ahandle = pMsg->rpcMsg.ahandle; void *ahandle = pMsg->rpcMsg.ahandle;
int32_t code = 0;
if (pMsg->rpcMsg.pCont == NULL) { if (pMsg->rpcMsg.pCont == NULL) {
mError("msg:%p, app:%p type:%s content is null", pMsg, ahandle, taosMsg[msgType]); mError("msg:%p, app:%p type:%s content is null", pMsg, ahandle, taosMsg[msgType]);
return TSDB_CODE_MND_INVALID_MSG_LEN; code = TSDB_CODE_MND_INVALID_MSG_LEN;
goto PROCESS_WRITE_REQ_END;
} }
if (!mnodeIsMaster()) { if (!mnodeIsMaster()) {
@ -309,31 +305,39 @@ static int32_t mnodeProcessWriteReq(void *unused, SMnMsg *pMsg, int32_t qtype) {
mDebug("msg:%p, app:%p type:%s in write queue, is redirected, numOfEps:%d inUse:%d", pMsg, ahandle, mDebug("msg:%p, app:%p type:%s in write queue, is redirected, numOfEps:%d inUse:%d", pMsg, ahandle,
taosMsg[msgType], epSet->numOfEps, epSet->inUse); taosMsg[msgType], epSet->numOfEps, epSet->inUse);
return TSDB_CODE_RPC_REDIRECT; code = TSDB_CODE_RPC_REDIRECT;
goto PROCESS_WRITE_REQ_END;
} }
if (tsMworker.writeMsgFp[msgType] == NULL) { if (tsMworker.writeMsgFp[msgType] == NULL) {
mError("msg:%p, app:%p type:%s not processed", pMsg, ahandle, taosMsg[msgType]); mError("msg:%p, app:%p type:%s not processed", pMsg, ahandle, taosMsg[msgType]);
return TSDB_CODE_MND_MSG_NOT_PROCESSED; code = TSDB_CODE_MND_MSG_NOT_PROCESSED;
goto PROCESS_WRITE_REQ_END;
} }
return (*tsMworker.writeMsgFp[msgType])(pMsg); code = (*tsMworker.writeMsgFp[msgType])(pMsg);
PROCESS_WRITE_REQ_END:
mnodeSendRsp(pMsg, code);
} }
static int32_t mnodeProcessReadReq(void* unused, SMnMsg *pMsg, int32_t qtype) { static void mnodeProcessReadReq(SMnMsg *pMsg, void *unused) {
int32_t msgType = pMsg->rpcMsg.msgType; int32_t msgType = pMsg->rpcMsg.msgType;
void *ahandle = pMsg->rpcMsg.ahandle; void *ahandle = pMsg->rpcMsg.ahandle;
int32_t code = 0;
if (pMsg->rpcMsg.pCont == NULL) { if (pMsg->rpcMsg.pCont == NULL) {
mError("msg:%p, app:%p type:%s in mread queue, content is null", pMsg, ahandle, taosMsg[msgType]); mError("msg:%p, app:%p type:%s in mread queue, content is null", pMsg, ahandle, taosMsg[msgType]);
return TSDB_CODE_MND_INVALID_MSG_LEN; code = TSDB_CODE_MND_INVALID_MSG_LEN;
goto PROCESS_READ_REQ_END;
} }
if (!mnodeIsMaster()) { if (!mnodeIsMaster()) {
SMnRsp *rpcRsp = &pMsg->rpcRsp; SMnRsp *rpcRsp = &pMsg->rpcRsp;
SRpcEpSet *epSet = rpcMallocCont(sizeof(SRpcEpSet)); SRpcEpSet *epSet = rpcMallocCont(sizeof(SRpcEpSet));
if (!epSet) { if (!epSet) {
return TSDB_CODE_MND_OUT_OF_MEMORY; code = TSDB_CODE_MND_OUT_OF_MEMORY;
goto PROCESS_READ_REQ_END;
} }
mnodeGetMnodeEpSetForShell(epSet, true); mnodeGetMnodeEpSetForShell(epSet, true);
rpcRsp->rsp = epSet; rpcRsp->rsp = epSet;
@ -341,25 +345,32 @@ static int32_t mnodeProcessReadReq(void* unused, SMnMsg *pMsg, int32_t qtype) {
mDebug("msg:%p, app:%p type:%s in mread queue is redirected, numOfEps:%d inUse:%d", pMsg, ahandle, taosMsg[msgType], mDebug("msg:%p, app:%p type:%s in mread queue is redirected, numOfEps:%d inUse:%d", pMsg, ahandle, taosMsg[msgType],
epSet->numOfEps, epSet->inUse); epSet->numOfEps, epSet->inUse);
return TSDB_CODE_RPC_REDIRECT; code = TSDB_CODE_RPC_REDIRECT;
goto PROCESS_READ_REQ_END;
} }
if (tsMworker.readMsgFp[msgType] == NULL) { if (tsMworker.readMsgFp[msgType] == NULL) {
mError("msg:%p, app:%p type:%s in mread queue, not processed", pMsg, ahandle, taosMsg[msgType]); mError("msg:%p, app:%p type:%s in mread queue, not processed", pMsg, ahandle, taosMsg[msgType]);
return TSDB_CODE_MND_MSG_NOT_PROCESSED; code = TSDB_CODE_MND_MSG_NOT_PROCESSED;
goto PROCESS_READ_REQ_END;
} }
mTrace("msg:%p, app:%p type:%s will be processed in mread queue", pMsg, ahandle, taosMsg[msgType]); mTrace("msg:%p, app:%p type:%s will be processed in mread queue", pMsg, ahandle, taosMsg[msgType]);
return (*tsMworker.readMsgFp[msgType])(pMsg); code = (*tsMworker.readMsgFp[msgType])(pMsg);
PROCESS_READ_REQ_END:
mnodeSendRsp(pMsg, code);
} }
static int32_t mnodeProcessPeerReq(void *unused, SMnMsg *pMsg, int32_t qtype) { static void mnodeProcessPeerReq(SMnMsg *pMsg, void *unused) {
int32_t msgType = pMsg->rpcMsg.msgType; int32_t msgType = pMsg->rpcMsg.msgType;
void * ahandle = pMsg->rpcMsg.ahandle; void *ahandle = pMsg->rpcMsg.ahandle;
int32_t code = 0;
if (pMsg->rpcMsg.pCont == NULL) { if (pMsg->rpcMsg.pCont == NULL) {
mError("msg:%p, ahandle:%p type:%s in mpeer queue, content is null", pMsg, ahandle, taosMsg[msgType]); mError("msg:%p, ahandle:%p type:%s in mpeer queue, content is null", pMsg, ahandle, taosMsg[msgType]);
return TSDB_CODE_MND_INVALID_MSG_LEN; code = TSDB_CODE_MND_INVALID_MSG_LEN;
goto PROCESS_PEER_REQ_END;
} }
if (!mnodeIsMaster()) { if (!mnodeIsMaster()) {
@ -372,24 +383,29 @@ static int32_t mnodeProcessPeerReq(void *unused, SMnMsg *pMsg, int32_t qtype) {
mDebug("msg:%p, ahandle:%p type:%s in mpeer queue is redirected, numOfEps:%d inUse:%d", pMsg, ahandle, mDebug("msg:%p, ahandle:%p type:%s in mpeer queue is redirected, numOfEps:%d inUse:%d", pMsg, ahandle,
taosMsg[msgType], epSet->numOfEps, epSet->inUse); taosMsg[msgType], epSet->numOfEps, epSet->inUse);
return TSDB_CODE_RPC_REDIRECT; code = TSDB_CODE_RPC_REDIRECT;
goto PROCESS_PEER_REQ_END;
} }
if (tsMworker.peerReqFp[msgType] == NULL) { if (tsMworker.peerReqFp[msgType] == NULL) {
mError("msg:%p, ahandle:%p type:%s in mpeer queue, not processed", pMsg, ahandle, taosMsg[msgType]); mError("msg:%p, ahandle:%p type:%s in mpeer queue, not processed", pMsg, ahandle, taosMsg[msgType]);
return TSDB_CODE_MND_MSG_NOT_PROCESSED; code = TSDB_CODE_MND_MSG_NOT_PROCESSED;
goto PROCESS_PEER_REQ_END;
} }
return (*tsMworker.peerReqFp[msgType])(pMsg); code = (*tsMworker.peerReqFp[msgType])(pMsg);
PROCESS_PEER_REQ_END:
mnodeSendRsp(pMsg, code);
} }
static int32_t mnodeProcessPeerRsp(void *ahandle, SMnMsg *pMsg, int32_t qtype) { static void mnodeProcessPeerRsp(SMnMsg *pMsg, void *unused) {
int32_t msgType = pMsg->rpcMsg.msgType; int32_t msgType = pMsg->rpcMsg.msgType;
SRpcMsg *pRpcMsg = &pMsg->rpcMsg; SRpcMsg *pRpcMsg = &pMsg->rpcMsg;
if (!mnodeIsMaster()) { if (!mnodeIsMaster()) {
mError("msg:%p, ahandle:%p type:%s not processed for not master", pRpcMsg, pRpcMsg->ahandle, taosMsg[msgType]); mError("msg:%p, ahandle:%p type:%s not processed for not master", pRpcMsg, pRpcMsg->ahandle, taosMsg[msgType]);
return 0; mnodeCleanupMsg(pMsg);
} }
if (tsMworker.peerRspFp[msgType]) { if (tsMworker.peerRspFp[msgType]) {
@ -398,7 +414,7 @@ static int32_t mnodeProcessPeerRsp(void *ahandle, SMnMsg *pMsg, int32_t qtype) {
mError("msg:%p, ahandle:%p type:%s is not processed", pRpcMsg, pRpcMsg->ahandle, taosMsg[msgType]); mError("msg:%p, ahandle:%p type:%s is not processed", pRpcMsg, pRpcMsg->ahandle, taosMsg[msgType]);
} }
return 0; mnodeCleanupMsg(pMsg);
} }
int32_t mnodeInitWorker() { int32_t mnodeInitWorker() {
@ -406,20 +422,16 @@ int32_t mnodeInitWorker() {
SWorkerPool *pPool = &tsMworker.write; SWorkerPool *pPool = &tsMworker.write;
pPool->name = "mnode-write"; pPool->name = "mnode-write";
pPool->startFp = (ProcessStartFp)mnodeProcessWriteReq;
pPool->endFp = (ProcessEndFp)mnodeSendRpcRsp;
pPool->min = 1; pPool->min = 1;
pPool->max = 1; pPool->max = 1;
if (tWorkerInit(pPool) != 0) { if (tWorkerInit(pPool) != 0) {
return TSDB_CODE_MND_OUT_OF_MEMORY; return TSDB_CODE_MND_OUT_OF_MEMORY;
} else { } else {
tsMworker.writeQ = tWorkerAllocQueue(pPool, NULL); tsMworker.writeQ = tWorkerAllocQueue(pPool, NULL, (FProcessOneItem)mnodeProcessWriteReq);
} }
pPool = &tsMworker.read; pPool = &tsMworker.read;
pPool->name = "mnode-read"; pPool->name = "mnode-read";
pPool->startFp = (ProcessStartFp)mnodeProcessReadReq;
pPool->endFp = (ProcessEndFp)mnodeSendRpcRsp;
pPool->min = 2; pPool->min = 2;
pPool->max = (int32_t)(tsNumOfCores * tsNumOfThreadsPerCore / 2); pPool->max = (int32_t)(tsNumOfCores * tsNumOfThreadsPerCore / 2);
pPool->max = MAX(2, pPool->max); pPool->max = MAX(2, pPool->max);
@ -427,31 +439,27 @@ int32_t mnodeInitWorker() {
if (tWorkerInit(pPool) != 0) { if (tWorkerInit(pPool) != 0) {
return TSDB_CODE_MND_OUT_OF_MEMORY; return TSDB_CODE_MND_OUT_OF_MEMORY;
} else { } else {
tsMworker.readQ = tWorkerAllocQueue(pPool, NULL); tsMworker.readQ = tWorkerAllocQueue(pPool, NULL, (FProcessOneItem)mnodeProcessReadReq);
} }
pPool = &tsMworker.peerReq; pPool = &tsMworker.peerReq;
pPool->name = "mnode-peer-req"; pPool->name = "mnode-peer-req";
pPool->startFp = (ProcessStartFp)mnodeProcessPeerReq;
pPool->endFp = (ProcessEndFp)mnodeSendRpcRsp;
pPool->min = 1; pPool->min = 1;
pPool->max = 1; pPool->max = 1;
if (tWorkerInit(pPool) != 0) { if (tWorkerInit(pPool) != 0) {
return TSDB_CODE_MND_OUT_OF_MEMORY; return TSDB_CODE_MND_OUT_OF_MEMORY;
} else { } else {
tsMworker.peerReqQ = tWorkerAllocQueue(pPool, NULL); tsMworker.peerReqQ = tWorkerAllocQueue(pPool, NULL, (FProcessOneItem)mnodeProcessPeerReq);
} }
pPool = &tsMworker.peerRsp; pPool = &tsMworker.peerRsp;
pPool->name = "mnode-peer-rsp"; pPool->name = "mnode-peer-rsp";
pPool->startFp = (ProcessStartFp)mnodeProcessPeerRsp;
pPool->endFp = (ProcessEndFp)mnodeProcessPeerRspEnd;
pPool->min = 1; pPool->min = 1;
pPool->max = 1; pPool->max = 1;
if (tWorkerInit(pPool) != 0) { if (tWorkerInit(pPool) != 0) {
return TSDB_CODE_MND_OUT_OF_MEMORY; return TSDB_CODE_MND_OUT_OF_MEMORY;
} else { } else {
tsMworker.peerRspQ = tWorkerAllocQueue(pPool, NULL); tsMworker.peerRspQ = tWorkerAllocQueue(pPool, NULL, (FProcessOneItem)mnodeProcessPeerRsp);
} }
mInfo("mnode worker is initialized"); mInfo("mnode worker is initialized");

View File

@ -23,11 +23,12 @@ extern "C" {
int32_t vnodeInitWrite(); int32_t vnodeInitWrite();
void vnodeCleanupWrite(); void vnodeCleanupWrite();
taos_queue vnodeAllocWriteQueue(SVnode *pVnode); taos_queue vnodeAllocWriteQueue(SVnode *pVnode);
void vnodeFreeWriteQueue(taos_queue pQueue); void vnodeFreeWriteQueue(taos_queue pQueue);
taos_queue vnodeAllocApplyQueue(SVnode *pVnode);
void vnodeProcessWriteMsg(SRpcMsg *pRpcMsg); void vnodeFreeApplyQueue(taos_queue pQueue);
int32_t vnodeProcessWalMsg(SVnode *pVnode, SWalHead *pHead); void vnodeProcessWriteReq(SRpcMsg *pRpcMsg);
void vnodeStartWrite(SVnode *pVnode); void vnodeStartWrite(SVnode *pVnode);
void vnodeStopWrite(SVnode *pVnode); void vnodeStopWrite(SVnode *pVnode);

View File

@ -54,17 +54,17 @@ static void vnodeInitMsgFp() {
tsVint.msgFp[TSDB_MSG_TYPE_MD_COMPACT_VNODE] = vnodeProcessMgmtMsg; tsVint.msgFp[TSDB_MSG_TYPE_MD_COMPACT_VNODE] = vnodeProcessMgmtMsg;
tsVint.msgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = vnodeProcessMgmtMsg; tsVint.msgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = vnodeProcessMgmtMsg;
tsVint.msgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = vnodeProcessMgmtMsg; tsVint.msgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = vnodeProcessMgmtMsg;
tsVint.msgFp[TSDB_MSG_TYPE_MD_CREATE_TABLE] = vnodeProcessWriteMsg; tsVint.msgFp[TSDB_MSG_TYPE_MD_CREATE_TABLE] = vnodeProcessWriteReq;
tsVint.msgFp[TSDB_MSG_TYPE_MD_DROP_TABLE] = vnodeProcessWriteMsg; tsVint.msgFp[TSDB_MSG_TYPE_MD_DROP_TABLE] = vnodeProcessWriteReq;
tsVint.msgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE] = vnodeProcessWriteMsg; tsVint.msgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE] = vnodeProcessWriteReq;
tsVint.msgFp[TSDB_MSG_TYPE_MD_DROP_STABLE] = vnodeProcessWriteMsg; tsVint.msgFp[TSDB_MSG_TYPE_MD_DROP_STABLE] = vnodeProcessWriteReq;
tsVint.msgFp[TSDB_MSG_TYPE_SUBMIT] = vnodeProcessWriteMsg; tsVint.msgFp[TSDB_MSG_TYPE_SUBMIT] = vnodeProcessWriteReq;
tsVint.msgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = vnodeProcessWriteMsg; tsVint.msgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = vnodeProcessWriteReq;
// mq related // mq related
tsVint.msgFp[TSDB_MSG_TYPE_MQ_CONNECT] = vnodeProcessWriteMsg; tsVint.msgFp[TSDB_MSG_TYPE_MQ_CONNECT] = vnodeProcessWriteReq;
tsVint.msgFp[TSDB_MSG_TYPE_MQ_DISCONNECT] = vnodeProcessWriteMsg; tsVint.msgFp[TSDB_MSG_TYPE_MQ_DISCONNECT] = vnodeProcessWriteReq;
tsVint.msgFp[TSDB_MSG_TYPE_MQ_ACK] = vnodeProcessWriteMsg; tsVint.msgFp[TSDB_MSG_TYPE_MQ_ACK] = vnodeProcessWriteReq;
tsVint.msgFp[TSDB_MSG_TYPE_MQ_RESET] = vnodeProcessWriteMsg; tsVint.msgFp[TSDB_MSG_TYPE_MQ_RESET] = vnodeProcessWriteReq;
tsVint.msgFp[TSDB_MSG_TYPE_MQ_QUERY] = vnodeProcessReadMsg; tsVint.msgFp[TSDB_MSG_TYPE_MQ_QUERY] = vnodeProcessReadMsg;
tsVint.msgFp[TSDB_MSG_TYPE_MQ_CONSUME] = vnodeProcessReadMsg; tsVint.msgFp[TSDB_MSG_TYPE_MQ_CONSUME] = vnodeProcessReadMsg;
// mq related end // mq related end

View File

@ -194,22 +194,18 @@ static int32_t vnodeProcessAlterStreamReq(SRpcMsg *pMsg) {
return TSDB_CODE_VND_MSG_NOT_PROCESSED; return TSDB_CODE_VND_MSG_NOT_PROCESSED;
} }
static int32_t vnodeProcessMgmtStart(void *unused, SVnMgmtMsg *pMgmt, int32_t qtype) { static void vnodeProcessMgmtReq(SVnMgmtMsg *pMgmt, void *unused) {
SRpcMsg *pMsg = &pMgmt->rpcMsg; SRpcMsg *pMsg = &pMgmt->rpcMsg;
int32_t msgType = pMsg->msgType; int32_t msgType = pMsg->msgType;
int32_t code = 0;
if (tsVmgmt.msgFp[msgType]) { if (tsVmgmt.msgFp[msgType]) {
vTrace("msg:%p, ahandle:%p type:%s will be processed", pMgmt, pMsg->ahandle, taosMsg[msgType]); vTrace("msg:%p, ahandle:%p type:%s will be processed", pMgmt, pMsg->ahandle, taosMsg[msgType]);
return (*tsVmgmt.msgFp[msgType])(pMsg); code = (*tsVmgmt.msgFp[msgType])(pMsg);
} else { } else {
vError("msg:%p, ahandle:%p type:%s not processed since no handle", pMgmt, pMsg->ahandle, taosMsg[msgType]); vError("msg:%p, ahandle:%p type:%s not processed since no handle", pMgmt, pMsg->ahandle, taosMsg[msgType]);
return TSDB_CODE_DND_MSG_NOT_PROCESSED; code = TSDB_CODE_DND_MSG_NOT_PROCESSED;
} }
}
static void vnodeProcessMgmtEnd(void *unused, SVnMgmtMsg *pMgmt, int32_t qtype, int32_t code) {
SRpcMsg *pMsg = &pMgmt->rpcMsg;
vTrace("msg:%p, is processed, result:%s", pMgmt, tstrerror(code));
SRpcMsg rsp = {.code = code, .handle = pMsg->handle}; SRpcMsg rsp = {.code = code, .handle = pMsg->handle};
rpcSendResponse(&rsp); rpcSendResponse(&rsp);
@ -235,9 +231,9 @@ static int32_t vnodeWriteToMgmtQueue(SRpcMsg *pMsg) {
memcpy(pMgmt->pCont, pMsg->pCont, pMsg->contLen); memcpy(pMgmt->pCont, pMsg->pCont, pMsg->contLen);
if (pMsg->msgType == TSDB_MSG_TYPE_MD_CREATE_VNODE) { if (pMsg->msgType == TSDB_MSG_TYPE_MD_CREATE_VNODE) {
return taosWriteQitem(tsVmgmt.createQueue, TAOS_QTYPE_RPC, pMgmt); return taosWriteQitem(tsVmgmt.createQueue, pMgmt);
} else { } else {
return taosWriteQitem(tsVmgmt.workerQueue, TAOS_QTYPE_RPC, pMgmt); return taosWriteQitem(tsVmgmt.workerQueue, pMgmt);
} }
} }
@ -257,27 +253,23 @@ int32_t vnodeInitMgmt() {
SWorkerPool *pPool = &tsVmgmt.createPool; SWorkerPool *pPool = &tsVmgmt.createPool;
pPool->name = "vnode-mgmt-create"; pPool->name = "vnode-mgmt-create";
pPool->startFp = (ProcessStartFp)vnodeProcessMgmtStart;
pPool->endFp = (ProcessEndFp)vnodeProcessMgmtEnd;
pPool->min = 1; pPool->min = 1;
pPool->max = 1; pPool->max = 1;
if (tWorkerInit(pPool) != 0) { if (tWorkerInit(pPool) != 0) {
return TSDB_CODE_VND_OUT_OF_MEMORY; return TSDB_CODE_VND_OUT_OF_MEMORY;
} }
tsVmgmt.createQueue = tWorkerAllocQueue(pPool, NULL); tsVmgmt.createQueue = tWorkerAllocQueue(pPool, NULL, (FProcessOneItem)vnodeProcessMgmtReq);
pPool = &tsVmgmt.workerPool; pPool = &tsVmgmt.workerPool;
pPool->name = "vnode-mgmt-worker"; pPool->name = "vnode-mgmt-worker";
pPool->startFp = (ProcessStartFp)vnodeProcessMgmtStart;
pPool->endFp = (ProcessEndFp)vnodeProcessMgmtEnd;
pPool->min = 1; pPool->min = 1;
pPool->max = 1; pPool->max = 1;
if (tWorkerInit(pPool) != 0) { if (tWorkerInit(pPool) != 0) {
return TSDB_CODE_VND_OUT_OF_MEMORY; return TSDB_CODE_VND_OUT_OF_MEMORY;
} }
tsVmgmt.workerQueue = tWorkerAllocQueue(pPool, NULL); tsVmgmt.workerQueue = tWorkerAllocQueue(pPool, NULL, (FProcessOneItem)vnodeProcessMgmtReq);
vInfo("vmgmt is initialized"); vInfo("vmgmt is initialized");
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;

View File

@ -71,9 +71,9 @@ static int32_t vnodeWriteToRQueue(SVnode *pVnode, void *pCont, int32_t contLen,
atomic_add_fetch_32(&pVnode->queuedRMsg, 1); atomic_add_fetch_32(&pVnode->queuedRMsg, 1);
if (pRead->code == TSDB_CODE_RPC_NETWORK_UNAVAIL || pRead->msgType == TSDB_MSG_TYPE_FETCH) { if (pRead->code == TSDB_CODE_RPC_NETWORK_UNAVAIL || pRead->msgType == TSDB_MSG_TYPE_FETCH) {
return taosWriteQitem(pVnode->pFetchQ, qtype, pRead); return taosWriteQitem(pVnode->pFetchQ, pRead);
} else { } else {
return taosWriteQitem(pVnode->pQueryQ, qtype, pRead); return taosWriteQitem(pVnode->pQueryQ, pRead);
} }
} }
@ -136,18 +136,6 @@ static void vnodeInitReadMsgFp() {
tsVread.msgFp[TSDB_MSG_TYPE_MQ_CONSUME] = vnodeProcessConsumeMsg; tsVread.msgFp[TSDB_MSG_TYPE_MQ_CONSUME] = vnodeProcessConsumeMsg;
} }
static int32_t vnodeProcessReadStart(SVnode *pVnode, SReadMsg *pRead, int32_t qtype) {
int32_t msgType = pRead->msgType;
if (tsVread.msgFp[msgType] == NULL) {
vDebug("vgId:%d, msgType:%s not processed, no handle", pVnode->vgId, taosMsg[msgType]);
return TSDB_CODE_VND_MSG_NOT_PROCESSED;
} else {
vTrace("msg:%p, app:%p type:%s will be processed", pRead, pRead->rpcAhandle, taosMsg[msgType]);
}
return (*tsVread.msgFp[msgType])(pVnode, pRead);
}
static void vnodeSendReadRsp(SReadMsg *pRead, int32_t code) { static void vnodeSendReadRsp(SReadMsg *pRead, int32_t code) {
SRpcMsg rpcRsp = { SRpcMsg rpcRsp = {
.handle = pRead->rpcHandle, .handle = pRead->rpcHandle,
@ -159,8 +147,18 @@ static void vnodeSendReadRsp(SReadMsg *pRead, int32_t code) {
rpcSendResponse(&rpcRsp); rpcSendResponse(&rpcRsp);
} }
static void vnodeProcessReadEnd(SVnode *pVnode, SReadMsg *pRead, int32_t qtype, int32_t code) { static void vnodeProcessReadReq(SReadMsg *pRead, SVnode *pVnode) {
if (qtype == TAOS_QTYPE_RPC && code != TSDB_CODE_QRY_NOT_READY) { int32_t msgType = pRead->msgType;
int32_t code = 0;
if (tsVread.msgFp[msgType] == NULL) {
vDebug("vgId:%d, msgType:%s not processed, no handle", pVnode->vgId, taosMsg[msgType]);
code = TSDB_CODE_VND_MSG_NOT_PROCESSED;
} else {
vTrace("msg:%p, app:%p type:%s will be processed", pRead, pRead->rpcAhandle, taosMsg[msgType]);
code = (*tsVread.msgFp[msgType])(pVnode, pRead);
}
if (/*qtype == TAOS_QTYPE_RPC && */ code != TSDB_CODE_QRY_NOT_READY) {
vnodeSendReadRsp(pRead, code); vnodeSendReadRsp(pRead, code);
} else { } else {
if (code == TSDB_CODE_QRY_HAS_RSP) { if (code == TSDB_CODE_QRY_HAS_RSP) {
@ -181,16 +179,12 @@ int32_t vnodeInitRead() {
SWorkerPool *pPool = &tsVread.query; SWorkerPool *pPool = &tsVread.query;
pPool->name = "vquery"; pPool->name = "vquery";
pPool->startFp = (ProcessStartFp)vnodeProcessReadStart;
pPool->endFp = (ProcessEndFp)vnodeProcessReadEnd;
pPool->min = (int32_t)threadsForQuery; pPool->min = (int32_t)threadsForQuery;
pPool->max = pPool->min; pPool->max = pPool->min;
if (tWorkerInit(pPool) != 0) return -1; if (tWorkerInit(pPool) != 0) return -1;
pPool = &tsVread.fetch; pPool = &tsVread.fetch;
pPool->name = "vfetch"; pPool->name = "vfetch";
pPool->startFp = (ProcessStartFp)vnodeProcessReadStart;
pPool->endFp = (ProcessEndFp)vnodeProcessReadEnd;
pPool->min = MIN(maxFetchThreads, tsNumOfCores); pPool->min = MIN(maxFetchThreads, tsNumOfCores);
pPool->max = pPool->min; pPool->max = pPool->min;
if (tWorkerInit(pPool) != 0) return -1; if (tWorkerInit(pPool) != 0) return -1;
@ -205,9 +199,13 @@ void vnodeCleanupRead() {
vInfo("vread is closed"); vInfo("vread is closed");
} }
taos_queue vnodeAllocQueryQueue(SVnode *pVnode) { return tWorkerAllocQueue(&tsVread.query, pVnode); } taos_queue vnodeAllocQueryQueue(SVnode *pVnode) {
return tWorkerAllocQueue(&tsVread.query, pVnode, (FProcessOneItem)vnodeProcessReadReq);
}
taos_queue vnodeAllocFetchQueue(SVnode *pVnode) { return tWorkerAllocQueue(&tsVread.fetch, pVnode); } taos_queue vnodeAllocFetchQueue(SVnode *pVnode) {
return tWorkerAllocQueue(&tsVread.fetch, pVnode, (FProcessOneItem)vnodeProcessReadReq);
}
void vnodeFreeQueryQueue(taos_queue pQueue) { tWorkerFreeQueue(&tsVread.query, pQueue); } void vnodeFreeQueryQueue(taos_queue pQueue) { tWorkerFreeQueue(&tsVread.query, pQueue); }

View File

@ -32,15 +32,13 @@ typedef struct {
} SVnWriteMsg; } SVnWriteMsg;
static struct { static struct {
SWriteWorkerPool pool; SMWorkerPool pool;
int64_t queuedBytes; int64_t queuedBytes;
int32_t queuedMsgs; int32_t queuedMsgs;
} tsVwrite = {0}; } tsVwrite = {0};
void vnodeStartWrite(SVnode *pVnode) {} void vnodeStartWrite(SVnode *pVnode) {}
void vnodeStoprite(SVnode *pVnode) {} void vnodeStopWrite(SVnode *pVnode) {
void vnodeWaitWriteCompleted(SVnode *pVnode) {
while (pVnode->queuedWMsg > 0) { while (pVnode->queuedWMsg > 0) {
vTrace("vgId:%d, queued wmsg num:%d", pVnode->vgId, pVnode->queuedWMsg); vTrace("vgId:%d, queued wmsg num:%d", pVnode->vgId, pVnode->queuedWMsg);
taosMsleep(10); taosMsleep(10);
@ -86,7 +84,7 @@ static int32_t vnodeWriteToWQueue(SVnode *pVnode, SWalHead *pHead, int32_t qtype
atomic_add_fetch_32(&tsVwrite.queuedMsgs, 1); atomic_add_fetch_32(&tsVwrite.queuedMsgs, 1);
atomic_add_fetch_32(&pVnode->refCount, 1); atomic_add_fetch_32(&pVnode->refCount, 1);
atomic_add_fetch_32(&pVnode->queuedWMsg, 1); atomic_add_fetch_32(&pVnode->queuedWMsg, 1);
taosWriteQitem(pVnode->pWriteQ, pWrite->qtype, pWrite); taosWriteQitem(pVnode->pWriteQ, pWrite);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -101,11 +99,7 @@ static void vnodeFreeFromWQueue(SVnode *pVnode, SVnWriteMsg *pWrite) {
vnodeRelease(pVnode); vnodeRelease(pVnode);
} }
int32_t vnodeProcessWalMsg(SVnode *pVnode, SWalHead *pHead) { void vnodeProcessWriteReq(SRpcMsg *pRpcMsg) {
return vnodeWriteToWQueue(pVnode, pHead, TAOS_QTYPE_WAL, NULL);
}
void vnodeProcessWriteMsg(SRpcMsg *pRpcMsg) {
int32_t code; int32_t code;
SMsgHead *pMsg = pRpcMsg->pCont; SMsgHead *pMsg = pRpcMsg->pCont;
@ -132,109 +126,104 @@ void vnodeProcessWriteMsg(SRpcMsg *pRpcMsg) {
rpcFreeCont(pRpcMsg->pCont); rpcFreeCont(pRpcMsg->pCont);
} }
static bool vnodeProcessWriteStart(SVnode *pVnode, SVnWriteMsg *pWrite, int32_t qtype) { static void vnodeProcessWrite(SVnWriteMsg *pWrite, SVnode *pVnode) {
SWalHead *pHead = &pWrite->walHead;
SVnRsp * pRet = &pWrite->rspRet;
int32_t msgType = pHead->msgType;
vTrace("vgId:%d, msg:%s will be processed, hver:%" PRIu64, pVnode->vgId, taosMsg[pHead->msgType], pHead->version);
// write into WAL
#if 0
pWrite->code = walWrite(pVnode->wal, pHead);
if (pWrite->code < 0) return false;
pVnode->version = pHead->version;
#endif
// write data locally
switch (msgType) {
case TSDB_MSG_TYPE_SUBMIT:
pRet->len = sizeof(SSubmitRsp);
pRet->rsp = rpcMallocCont(pRet->len);
pWrite->code = vnodeProcessSubmitReq(pVnode, (void*)pHead->cont, pRet->rsp);
break;
case TSDB_MSG_TYPE_MD_CREATE_TABLE:
pWrite->code = vnodeProcessCreateTableReq(pVnode, (void*)pHead->cont, NULL);
break;
case TSDB_MSG_TYPE_MD_DROP_TABLE:
pWrite->code = vnodeProcessDropTableReq(pVnode, (void*)pHead->cont, NULL);
break;
case TSDB_MSG_TYPE_MD_ALTER_TABLE:
pWrite->code = vnodeProcessAlterTableReq(pVnode, (void*)pHead->cont, NULL);
break;
case TSDB_MSG_TYPE_MD_DROP_STABLE:
pWrite->code = vnodeProcessDropStableReq(pVnode, (void*)pHead->cont, NULL);
break;
case TSDB_MSG_TYPE_UPDATE_TAG_VAL:
pWrite->code = vnodeProcessUpdateTagValReq(pVnode, (void*)pHead->cont, NULL);
break;
//mq related
case TSDB_MSG_TYPE_MQ_CONNECT:
pWrite->code = vnodeProcessMqConnectReq(pVnode, (void*)pHead->cont, NULL);
break;
case TSDB_MSG_TYPE_MQ_DISCONNECT:
pWrite->code = vnodeProcessMqDisconnectReq(pVnode, (void*)pHead->cont, NULL);
break;
case TSDB_MSG_TYPE_MQ_ACK:
pWrite->code = vnodeProcessMqAckReq(pVnode, (void*)pHead->cont, NULL);
break;
case TSDB_MSG_TYPE_MQ_RESET:
pWrite->code = vnodeProcessMqResetReq(pVnode, (void*)pHead->cont, NULL);
break;
//mq related end
default:
pWrite->code = TSDB_CODE_VND_MSG_NOT_PROCESSED;
break;
}
if (pWrite->code < 0) return false;
// update fsync
return (pWrite->code == 0 && msgType != TSDB_MSG_TYPE_SUBMIT);
} }
// SWalHead *pHead = &pWrite->walHead;
// SVnRsp * pRet = &pWrite->rspRet;
// int32_t msgType = pHead->msgType;
static void vnodeFsync(SVnode *pVnode, bool fsync) { // vTrace("vgId:%d, msg:%s will be processed, hver:%" PRIu64, pVnode->vgId, taosMsg[pHead->msgType], pHead->version);
#if 0
walFsync(pVnode->wal, fsync);
#endif
}
static void vnodeProcessWriteEnd(SVnode *pVnode, SVnWriteMsg *pWrite, int32_t qtype, int32_t code) { // // write data locally
if (qtype == TAOS_QTYPE_RPC) { // switch (msgType) {
SRpcMsg rpcRsp = { // case TSDB_MSG_TYPE_SUBMIT:
.handle = pWrite->rpcMsg.handle, // pRet->len = sizeof(SSubmitRsp);
.pCont = pWrite->rspRet.rsp, // pRet->rsp = rpcMallocCont(pRet->len);
.contLen = pWrite->rspRet.len, // pWrite->code = vnodeProcessSubmitReq(pVnode, (void*)pHead->cont, pRet->rsp);
.code = pWrite->code, // break;
}; // case TSDB_MSG_TYPE_MD_CREATE_TABLE:
rpcSendResponse(&rpcRsp); // pWrite->code = vnodeProcessCreateTableReq(pVnode, (void*)pHead->cont, NULL);
} else { // break;
if (pWrite->rspRet.rsp) { // case TSDB_MSG_TYPE_MD_DROP_TABLE:
rpcFreeCont(pWrite->rspRet.rsp); // pWrite->code = vnodeProcessDropTableReq(pVnode, (void*)pHead->cont, NULL);
} // break;
} // case TSDB_MSG_TYPE_MD_ALTER_TABLE:
vnodeFreeFromWQueue(pVnode, pWrite); // pWrite->code = vnodeProcessAlterTableReq(pVnode, (void*)pHead->cont, NULL);
} // break;
// case TSDB_MSG_TYPE_MD_DROP_STABLE:
// pWrite->code = vnodeProcessDropStableReq(pVnode, (void*)pHead->cont, NULL);
// break;
// case TSDB_MSG_TYPE_UPDATE_TAG_VAL:
// pWrite->code = vnodeProcessUpdateTagValReq(pVnode, (void*)pHead->cont, NULL);
// break;
// //mq related
// case TSDB_MSG_TYPE_MQ_CONNECT:
// pWrite->code = vnodeProcessMqConnectReq(pVnode, (void*)pHead->cont, NULL);
// break;
// case TSDB_MSG_TYPE_MQ_DISCONNECT:
// pWrite->code = vnodeProcessMqDisconnectReq(pVnode, (void*)pHead->cont, NULL);
// break;
// case TSDB_MSG_TYPE_MQ_ACK:
// pWrite->code = vnodeProcessMqAckReq(pVnode, (void*)pHead->cont, NULL);
// break;
// case TSDB_MSG_TYPE_MQ_RESET:
// pWrite->code = vnodeProcessMqResetReq(pVnode, (void*)pHead->cont, NULL);
// break;
// //mq related end
// default:
// pWrite->code = TSDB_CODE_VND_MSG_NOT_PROCESSED;
// break;
// }
// if (pWrite->code < 0) return false;
// // update fsync
// return (pWrite->code == 0 && msgType != TSDB_MSG_TYPE_SUBMIT);
// // walFsync(pVnode->wal, fsync);
// static void vnodeProcessWriteEnd(SVnWriteMsg *pWrite, SVnode *pVnode) {
// if (qtype == TAOS_QTYPE_RPC) {
// SRpcMsg rpcRsp = {
// .handle = pWrite->rpcMsg.handle,
// .pCont = pWrite->rspRet.rsp,
// .contLen = pWrite->rspRet.len,
// .code = pWrite->code,
// };
// rpcSendResponse(&rpcRsp);
// } else {
// if (pWrite->rspRet.rsp) {
// rpcFreeCont(pWrite->rspRet.rsp);
// }
// }
// vnodeFreeFromWQueue(pVnode, pWrite);
// }
int32_t vnodeInitWrite() { int32_t vnodeInitWrite() {
SWriteWorkerPool *pPool = &tsVwrite.pool; SMWorkerPool *pPool = &tsVwrite.pool;
pPool->name = "vwrite"; pPool->name = "vnode-write";
pPool->max = tsNumOfCores; pPool->max = tsNumOfCores;
pPool->startFp = (ProcessWriteStartFp)vnodeProcessWriteStart; if (tMWorkerInit(pPool) != 0) return -1;
pPool->syncFp = (ProcessWriteSyncFp)vnodeFsync;
pPool->endFp = (ProcessWriteEndFp)vnodeProcessWriteEnd;
if (tWriteWorkerInit(pPool) != 0) return -1;
vInfo("vwrite is initialized, max worker %d", pPool->max); vInfo("vwrite is initialized, max worker %d", pPool->max);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void vnodeCleanupWrite() { void vnodeCleanupWrite() {
tWriteWorkerCleanup(&tsVwrite.pool); tMWorkerCleanup(&tsVwrite.pool);
vInfo("vwrite is closed"); vInfo("vwrite is closed");
} }
taos_queue vnodeAllocWriteQueue(SVnode *pVnode) { return tWriteWorkerAllocQueue(&tsVwrite.pool, pVnode); } taos_queue vnodeAllocWriteQueue(SVnode *pVnode) {
return tMWorkerAllocQueue(&tsVwrite.pool, pVnode, NULL);
}
void vnodeFreeWriteQueue(taos_queue pQueue) { tWriteWorkerFreeQueue(&tsVwrite.pool, pQueue); } void vnodeFreeWriteQueue(taos_queue pQueue) { tMWorkerFreeQueue(&tsVwrite.pool, pQueue); }
taos_queue vnodeAllocApplyQueue(SVnode *pVnode) {
return tMWorkerAllocQueue(&tsVwrite.pool, pVnode, NULL);
}
void vnodeFreeApplyQueue(taos_queue pQueue) { tMWorkerFreeQueue(&tsVwrite.pool, pQueue); }

View File

@ -19,41 +19,41 @@
#include "tqueue.h" #include "tqueue.h"
typedef struct STaosQnode { typedef struct STaosQnode {
int type; struct STaosQnode *next;
struct STaosQnode *next; char item[];
char item[];
} STaosQnode; } STaosQnode;
typedef struct STaosQueue { typedef struct STaosQueue {
int32_t itemSize; int32_t itemSize;
int32_t numOfItems; int32_t numOfItems;
struct STaosQnode *head; struct STaosQnode *head;
struct STaosQnode *tail; struct STaosQnode *tail;
struct STaosQueue *next; // for queue set struct STaosQueue *next; // for queue set
struct STaosQset *qset; // for queue set struct STaosQset *qset; // for queue set
void *ahandle; // for queue set void *ahandle; // for queue set
pthread_mutex_t mutex; FProcessOneItem fpOneItem;
FProcessAllItem fpAllItem;
pthread_mutex_t mutex;
} STaosQueue; } STaosQueue;
typedef struct STaosQset { typedef struct STaosQset {
STaosQueue *head; STaosQueue *head;
STaosQueue *current; STaosQueue *current;
pthread_mutex_t mutex; pthread_mutex_t mutex;
int32_t numOfQueues; int32_t numOfQueues;
int32_t numOfItems; int32_t numOfItems;
tsem_t sem; tsem_t sem;
} STaosQset; } STaosQset;
typedef struct STaosQall { typedef struct STaosQall {
STaosQnode *current; STaosQnode *current;
STaosQnode *start; STaosQnode *start;
int32_t itemSize; int32_t itemSize;
int32_t numOfItems; int32_t numOfItems;
} STaosQall; } STaosQall;
taos_queue taosOpenQueue() { taos_queue taosOpenQueue() {
STaosQueue *queue = (STaosQueue *)calloc(sizeof(STaosQueue), 1);
STaosQueue *queue = (STaosQueue *) calloc(sizeof(STaosQueue), 1);
if (queue == NULL) { if (queue == NULL) {
terrno = TSDB_CODE_COM_OUT_OF_MEMORY; terrno = TSDB_CODE_COM_OUT_OF_MEMORY;
return NULL; return NULL;
@ -65,6 +65,13 @@ taos_queue taosOpenQueue() {
return queue; return queue;
} }
void taosSetQueueFp(taos_queue param, FProcessOneItem fpOneItem, FProcessAllItem fpAllItem) {
if (param == NULL) return;
STaosQueue *queue = (STaosQueue *)param;
queue->fpOneItem = fpOneItem;
queue->fpAllItem = fpAllItem;
}
void taosCloseQueue(taos_queue param) { void taosCloseQueue(taos_queue param) {
if (param == NULL) return; if (param == NULL) return;
STaosQueue *queue = (STaosQueue *)param; STaosQueue *queue = (STaosQueue *)param;
@ -72,17 +79,17 @@ void taosCloseQueue(taos_queue param) {
STaosQset *qset; STaosQset *qset;
pthread_mutex_lock(&queue->mutex); pthread_mutex_lock(&queue->mutex);
STaosQnode *pNode = queue->head; STaosQnode *pNode = queue->head;
queue->head = NULL; queue->head = NULL;
qset = queue->qset; qset = queue->qset;
pthread_mutex_unlock(&queue->mutex); pthread_mutex_unlock(&queue->mutex);
if (queue->qset) taosRemoveFromQset(qset, queue); if (queue->qset) taosRemoveFromQset(qset, queue);
while (pNode) { while (pNode) {
pTemp = pNode; pTemp = pNode;
pNode = pNode->next; pNode = pNode->next;
free (pTemp); free(pTemp);
} }
pthread_mutex_destroy(&queue->mutex); pthread_mutex_destroy(&queue->mutex);
@ -93,7 +100,7 @@ void taosCloseQueue(taos_queue param) {
void *taosAllocateQitem(int size) { void *taosAllocateQitem(int size) {
STaosQnode *pNode = (STaosQnode *)calloc(sizeof(STaosQnode) + size, 1); STaosQnode *pNode = (STaosQnode *)calloc(sizeof(STaosQnode) + size, 1);
if (pNode == NULL) return NULL; if (pNode == NULL) return NULL;
uTrace("item:%p, node:%p is allocated", pNode->item, pNode); uTrace("item:%p, node:%p is allocated", pNode->item, pNode);
return (void *)pNode->item; return (void *)pNode->item;
@ -108,10 +115,9 @@ void taosFreeQitem(void *param) {
free(temp); free(temp);
} }
int taosWriteQitem(taos_queue param, int type, void *item) { int taosWriteQitem(taos_queue param, void *item) {
STaosQueue *queue = (STaosQueue *)param; STaosQueue *queue = (STaosQueue *)param;
STaosQnode *pNode = (STaosQnode *)(((char *)item) - sizeof(STaosQnode)); STaosQnode *pNode = (STaosQnode *)(((char *)item) - sizeof(STaosQnode));
pNode->type = type;
pNode->next = NULL; pNode->next = NULL;
pthread_mutex_lock(&queue->mutex); pthread_mutex_lock(&queue->mutex);
@ -121,12 +127,12 @@ int taosWriteQitem(taos_queue param, int type, void *item) {
queue->tail = pNode; queue->tail = pNode;
} else { } else {
queue->head = pNode; queue->head = pNode;
queue->tail = pNode; queue->tail = pNode;
} }
queue->numOfItems++; queue->numOfItems++;
if (queue->qset) atomic_add_fetch_32(&queue->qset->numOfItems, 1); if (queue->qset) atomic_add_fetch_32(&queue->qset->numOfItems, 1);
uTrace("item:%p is put into queue:%p, type:%d items:%d", item, queue, type, queue->numOfItems); uTrace("item:%p is put into queue:%p, items:%d", item, queue, queue->numOfItems);
pthread_mutex_unlock(&queue->mutex); pthread_mutex_unlock(&queue->mutex);
@ -135,7 +141,7 @@ int taosWriteQitem(taos_queue param, int type, void *item) {
return 0; return 0;
} }
int taosReadQitem(taos_queue param, int *type, void **pitem) { int taosReadQitem(taos_queue param, void **pitem) {
STaosQueue *queue = (STaosQueue *)param; STaosQueue *queue = (STaosQueue *)param;
STaosQnode *pNode = NULL; STaosQnode *pNode = NULL;
int code = 0; int code = 0;
@ -143,17 +149,15 @@ int taosReadQitem(taos_queue param, int *type, void **pitem) {
pthread_mutex_lock(&queue->mutex); pthread_mutex_lock(&queue->mutex);
if (queue->head) { if (queue->head) {
pNode = queue->head; pNode = queue->head;
*pitem = pNode->item; *pitem = pNode->item;
*type = pNode->type; queue->head = pNode->next;
queue->head = pNode->next; if (queue->head == NULL) queue->tail = NULL;
if (queue->head == NULL) queue->numOfItems--;
queue->tail = NULL; if (queue->qset) atomic_sub_fetch_32(&queue->qset->numOfItems, 1);
queue->numOfItems--; code = 1;
if (queue->qset) atomic_sub_fetch_32(&queue->qset->numOfItems, 1); uDebug("item:%p is read out from queue:%p, items:%d", *pitem, queue, queue->numOfItems);
code = 1; }
uDebug("item:%p is read out from queue:%p, type:%d items:%d", *pitem, queue, *type, queue->numOfItems);
}
pthread_mutex_unlock(&queue->mutex); pthread_mutex_unlock(&queue->mutex);
@ -165,9 +169,7 @@ void *taosAllocateQall() {
return p; return p;
} }
void taosFreeQall(void *param) { void taosFreeQall(void *param) { free(param); }
free(param);
}
int taosReadAllQitems(taos_queue param, taos_qall p2) { int taosReadAllQitems(taos_queue param, taos_qall p2) {
STaosQueue *queue = (STaosQueue *)param; STaosQueue *queue = (STaosQueue *)param;
@ -203,33 +205,30 @@ int taosReadAllQitems(taos_queue param, taos_qall p2) {
return code; return code;
} }
int taosGetQitem(taos_qall param, int *type, void **pitem) { int taosGetQitem(taos_qall param, void **pitem) {
STaosQall *qall = (STaosQall *)param; STaosQall *qall = (STaosQall *)param;
STaosQnode *pNode; STaosQnode *pNode;
int num = 0; int num = 0;
pNode = qall->current; pNode = qall->current;
if (pNode) if (pNode) qall->current = pNode->next;
qall->current = pNode->next;
if (pNode) { if (pNode) {
*pitem = pNode->item; *pitem = pNode->item;
*type = pNode->type;
num = 1; num = 1;
uTrace("item:%p is fetched, type:%d", *pitem, *type); uTrace("item:%p is fetched", *pitem);
} }
return num; return num;
} }
void taosResetQitems(taos_qall param) { void taosResetQitems(taos_qall param) {
STaosQall *qall = (STaosQall *)param; STaosQall *qall = (STaosQall *)param;
qall->current = qall->start; qall->current = qall->start;
} }
taos_qset taosOpenQset() { taos_qset taosOpenQset() {
STaosQset *qset = (STaosQset *)calloc(sizeof(STaosQset), 1);
STaosQset *qset = (STaosQset *) calloc(sizeof(STaosQset), 1);
if (qset == NULL) { if (qset == NULL) {
terrno = TSDB_CODE_COM_OUT_OF_MEMORY; terrno = TSDB_CODE_COM_OUT_OF_MEMORY;
return NULL; return NULL;
@ -276,7 +275,7 @@ int taosAddIntoQset(taos_qset p1, taos_queue p2, void *ahandle) {
STaosQueue *queue = (STaosQueue *)p2; STaosQueue *queue = (STaosQueue *)p2;
STaosQset *qset = (STaosQset *)p1; STaosQset *qset = (STaosQset *)p1;
if (queue->qset) return -1; if (queue->qset) return -1;
pthread_mutex_lock(&qset->mutex); pthread_mutex_lock(&qset->mutex);
@ -299,7 +298,7 @@ int taosAddIntoQset(taos_qset p1, taos_queue p2, void *ahandle) {
void taosRemoveFromQset(taos_qset p1, taos_queue p2) { void taosRemoveFromQset(taos_qset p1, taos_queue p2) {
STaosQueue *queue = (STaosQueue *)p2; STaosQueue *queue = (STaosQueue *)p2;
STaosQset *qset = (STaosQset *)p1; STaosQset *qset = (STaosQset *)p1;
STaosQueue *tqueue = NULL; STaosQueue *tqueue = NULL;
pthread_mutex_lock(&qset->mutex); pthread_mutex_lock(&qset->mutex);
@ -313,7 +312,7 @@ void taosRemoveFromQset(taos_qset p1, taos_queue p2) {
tqueue = qset->head->next; tqueue = qset->head->next;
while (tqueue) { while (tqueue) {
assert(tqueue->qset); assert(tqueue->qset);
if (tqueue== queue) { if (tqueue == queue) {
prev->next = tqueue->next; prev->next = tqueue->next;
break; break;
} else { } else {
@ -333,29 +332,26 @@ void taosRemoveFromQset(taos_qset p1, taos_queue p2) {
queue->next = NULL; queue->next = NULL;
pthread_mutex_unlock(&queue->mutex); pthread_mutex_unlock(&queue->mutex);
} }
} }
pthread_mutex_unlock(&qset->mutex); pthread_mutex_unlock(&qset->mutex);
uTrace("queue:%p is removed from qset:%p", queue, qset); uTrace("queue:%p is removed from qset:%p", queue, qset);
} }
int taosGetQueueNumber(taos_qset param) { int taosGetQueueNumber(taos_qset param) { return ((STaosQset *)param)->numOfQueues; }
return ((STaosQset *)param)->numOfQueues;
}
int taosReadQitemFromQset(taos_qset param, int *type, void **pitem, void **phandle) { int taosReadQitemFromQset(taos_qset param, void **pitem, void **ahandle, FProcessOneItem *fpOneItem) {
STaosQset *qset = (STaosQset *)param; STaosQset *qset = (STaosQset *)param;
STaosQnode *pNode = NULL; STaosQnode *pNode = NULL;
int code = 0; int code = 0;
tsem_wait(&qset->sem); tsem_wait(&qset->sem);
pthread_mutex_lock(&qset->mutex); pthread_mutex_lock(&qset->mutex);
for(int i=0; i<qset->numOfQueues; ++i) { for (int i = 0; i < qset->numOfQueues; ++i) {
if (qset->current == NULL) if (qset->current == NULL) qset->current = qset->head;
qset->current = qset->head;
STaosQueue *queue = qset->current; STaosQueue *queue = qset->current;
if (queue) qset->current = queue->next; if (queue) qset->current = queue->next;
if (queue == NULL) break; if (queue == NULL) break;
@ -364,18 +360,17 @@ int taosReadQitemFromQset(taos_qset param, int *type, void **pitem, void **phand
pthread_mutex_lock(&queue->mutex); pthread_mutex_lock(&queue->mutex);
if (queue->head) { if (queue->head) {
pNode = queue->head; pNode = queue->head;
*pitem = pNode->item; *pitem = pNode->item;
if (type) *type = pNode->type; if (ahandle) *ahandle = queue->ahandle;
if (phandle) *phandle = queue->ahandle; if (fpOneItem) *fpOneItem = queue->fpOneItem;
queue->head = pNode->next; queue->head = pNode->next;
if (queue->head == NULL) if (queue->head == NULL) queue->tail = NULL;
queue->tail = NULL; queue->numOfItems--;
queue->numOfItems--; atomic_sub_fetch_32(&qset->numOfItems, 1);
atomic_sub_fetch_32(&qset->numOfItems, 1); code = 1;
code = 1; uTrace("item:%p is read out from queue:%p, items:%d", *pitem, queue, queue->numOfItems);
uTrace("item:%p is read out from queue:%p, type:%d items:%d", *pitem, queue, pNode->type, queue->numOfItems); }
}
pthread_mutex_unlock(&queue->mutex); pthread_mutex_unlock(&queue->mutex);
if (pNode) break; if (pNode) break;
@ -383,10 +378,10 @@ int taosReadQitemFromQset(taos_qset param, int *type, void **pitem, void **phand
pthread_mutex_unlock(&qset->mutex); pthread_mutex_unlock(&qset->mutex);
return code; return code;
} }
int taosReadAllQitemsFromQset(taos_qset param, taos_qall p2, void **phandle) { int taosReadAllQitemsFromQset(taos_qset param, taos_qall p2, void **ahandle, FProcessAllItem *fpAllItem) {
STaosQset *qset = (STaosQset *)param; STaosQset *qset = (STaosQset *)param;
STaosQueue *queue; STaosQueue *queue;
STaosQall *qall = (STaosQall *)p2; STaosQall *qall = (STaosQall *)p2;
@ -411,8 +406,9 @@ int taosReadAllQitemsFromQset(taos_qset param, taos_qall p2, void **phandle) {
qall->numOfItems = queue->numOfItems; qall->numOfItems = queue->numOfItems;
qall->itemSize = queue->itemSize; qall->itemSize = queue->itemSize;
code = qall->numOfItems; code = qall->numOfItems;
*phandle = queue->ahandle; if (ahandle) *ahandle = queue->ahandle;
if (fpAllItem) *fpAllItem = queue->fpAllItem;
queue->head = NULL; queue->head = NULL;
queue->tail = NULL; queue->tail = NULL;
queue->numOfItems = 0; queue->numOfItems = 0;

View File

@ -58,11 +58,11 @@ void tWorkerCleanup(SWorkerPool *pool) {
} }
static void *tWorkerThreadFp(SWorker *worker) { static void *tWorkerThreadFp(SWorker *worker) {
SWorkerPool *pool = worker->pool; SWorkerPool *pool = worker->pool;
FProcessOneItem fp = NULL;
void * msg = NULL; void *msg = NULL;
void * ahandle = NULL; void *ahandle = NULL;
int32_t qtype = 0;
int32_t code = 0; int32_t code = 0;
taosBlockSIGPIPE(); taosBlockSIGPIPE();
@ -70,19 +70,20 @@ static void *tWorkerThreadFp(SWorker *worker) {
uDebug("worker:%s:%d is running", pool->name, worker->id); uDebug("worker:%s:%d is running", pool->name, worker->id);
while (1) { while (1) {
if (taosReadQitemFromQset(pool->qset, &qtype, (void **)&msg, &ahandle) == 0) { if (taosReadQitemFromQset(pool->qset, (void **)&msg, &ahandle, &fp) == 0) {
uDebug("worker:%s:%d qset:%p, got no message and exiting", pool->name, worker->id, pool->qset); uDebug("worker:%s:%d qset:%p, got no message and exiting", pool->name, worker->id, pool->qset);
break; break;
} }
code = (*pool->startFp)(ahandle, msg, qtype); if (fp) {
if (pool->endFp) (*pool->endFp)(ahandle, msg, qtype, code); (*fp)(msg, ahandle);
}
} }
return NULL; return NULL;
} }
taos_queue tWorkerAllocQueue(SWorkerPool *pool, void *ahandle) { taos_queue tWorkerAllocQueue(SWorkerPool *pool, void *ahandle, FProcessOneItem fp) {
pthread_mutex_lock(&pool->mutex); pthread_mutex_lock(&pool->mutex);
taos_queue queue = taosOpenQueue(); taos_queue queue = taosOpenQueue();
if (queue == NULL) { if (queue == NULL) {
@ -90,6 +91,7 @@ taos_queue tWorkerAllocQueue(SWorkerPool *pool, void *ahandle) {
return NULL; return NULL;
} }
taosSetQueueFp(queue, fp, NULL);
taosAddIntoQset(pool->qset, queue, ahandle); taosAddIntoQset(pool->qset, queue, ahandle);
// spawn a thread to process queue // spawn a thread to process queue
@ -122,14 +124,14 @@ void tWorkerFreeQueue(SWorkerPool *pool, void *queue) {
uDebug("worker:%s, queue:%p is freed", pool->name, queue); uDebug("worker:%s, queue:%p is freed", pool->name, queue);
} }
int32_t tWriteWorkerInit(SWriteWorkerPool *pool) { int32_t tMWorkerInit(SMWorkerPool *pool) {
pool->nextId = 0; pool->nextId = 0;
pool->workers = calloc(sizeof(SWriteWorker), pool->max); pool->workers = calloc(sizeof(SMWorker), pool->max);
if (pool->workers == NULL) return -1; if (pool->workers == NULL) return -1;
pthread_mutex_init(&pool->mutex, NULL); pthread_mutex_init(&pool->mutex, NULL);
for (int32_t i = 0; i < pool->max; ++i) { for (int32_t i = 0; i < pool->max; ++i) {
SWriteWorker *worker = pool->workers + i; SMWorker *worker = pool->workers + i;
worker->id = i; worker->id = i;
worker->qall = NULL; worker->qall = NULL;
worker->qset = NULL; worker->qset = NULL;
@ -140,16 +142,16 @@ int32_t tWriteWorkerInit(SWriteWorkerPool *pool) {
return 0; return 0;
} }
void tWriteWorkerCleanup(SWriteWorkerPool *pool) { void tMWorkerCleanup(SMWorkerPool *pool) {
for (int32_t i = 0; i < pool->max; ++i) { for (int32_t i = 0; i < pool->max; ++i) {
SWriteWorker *worker = pool->workers + i; SMWorker *worker = pool->workers + i;
if (taosCheckPthreadValid(worker->thread)) { if (taosCheckPthreadValid(worker->thread)) {
if (worker->qset) taosQsetThreadResume(worker->qset); if (worker->qset) taosQsetThreadResume(worker->qset);
} }
} }
for (int32_t i = 0; i < pool->max; ++i) { for (int32_t i = 0; i < pool->max; ++i) {
SWriteWorker *worker = pool->workers + i; SMWorker *worker = pool->workers + i;
if (taosCheckPthreadValid(worker->thread)) { if (taosCheckPthreadValid(worker->thread)) {
pthread_join(worker->thread, NULL); pthread_join(worker->thread, NULL);
taosFreeQall(worker->qall); taosFreeQall(worker->qall);
@ -163,11 +165,12 @@ void tWriteWorkerCleanup(SWriteWorkerPool *pool) {
uInfo("worker:%s is closed", pool->name); uInfo("worker:%s is closed", pool->name);
} }
static void *tWriteWorkerThreadFp(SWriteWorker *worker) { static void *tWriteWorkerThreadFp(SMWorker *worker) {
SWriteWorkerPool *pool = worker->pool; SMWorkerPool *pool = worker->pool;
FProcessAllItem fp = NULL;
void * msg = NULL; void *msg = NULL;
void * ahandle = NULL; void *ahandle = NULL;
int32_t numOfMsgs = 0; int32_t numOfMsgs = 0;
int32_t qtype = 0; int32_t qtype = 0;
@ -176,34 +179,23 @@ static void *tWriteWorkerThreadFp(SWriteWorker *worker) {
uDebug("worker:%s:%d is running", pool->name, worker->id); uDebug("worker:%s:%d is running", pool->name, worker->id);
while (1) { while (1) {
numOfMsgs = taosReadAllQitemsFromQset(worker->qset, worker->qall, &ahandle); numOfMsgs = taosReadAllQitemsFromQset(worker->qset, worker->qall, &ahandle, &fp);
if (numOfMsgs == 0) { if (numOfMsgs == 0) {
uDebug("worker:%s:%d qset:%p, got no message and exiting", pool->name, worker->id, worker->qset); uDebug("worker:%s:%d qset:%p, got no message and exiting", pool->name, worker->id, worker->qset);
break; break;
} }
bool fsync = false; if (fp) {
for (int32_t i = 0; i < numOfMsgs; ++i) { (*fp)(worker->qall, numOfMsgs, ahandle);
taosGetQitem(worker->qall, &qtype, (void **)&msg);
fsync = fsync | (*pool->startFp)(ahandle, msg, qtype);
}
(*pool->syncFp)(ahandle, fsync);
// browse all items, and process them one by one
taosResetQitems(worker->qall);
for (int32_t i = 0; i < numOfMsgs; ++i) {
taosGetQitem(worker->qall, &qtype, (void **)&msg);
(*pool->endFp)(ahandle, msg, qtype);
} }
} }
return NULL; return NULL;
} }
taos_queue tWriteWorkerAllocQueue(SWriteWorkerPool *pool, void *ahandle) { taos_queue tMWorkerAllocQueue(SMWorkerPool *pool, void *ahandle, FProcessAllItem fp) {
pthread_mutex_lock(&pool->mutex); pthread_mutex_lock(&pool->mutex);
SWriteWorker *worker = pool->workers + pool->nextId; SMWorker *worker = pool->workers + pool->nextId;
taos_queue *queue = taosOpenQueue(); taos_queue *queue = taosOpenQueue();
if (queue == NULL) { if (queue == NULL) {
@ -211,6 +203,8 @@ taos_queue tWriteWorkerAllocQueue(SWriteWorkerPool *pool, void *ahandle) {
return NULL; return NULL;
} }
taosSetQueueFp(queue, NULL, fp);
if (worker->qset == NULL) { if (worker->qset == NULL) {
worker->qset = taosOpenQset(); worker->qset = taosOpenQset();
if (worker->qset == NULL) { if (worker->qset == NULL) {
@ -254,7 +248,7 @@ taos_queue tWriteWorkerAllocQueue(SWriteWorkerPool *pool, void *ahandle) {
return queue; return queue;
} }
void tWriteWorkerFreeQueue(SWriteWorkerPool *pool, taos_queue queue) { void tMWorkerFreeQueue(SMWorkerPool *pool, taos_queue queue) {
taosCloseQueue(queue); taosCloseQueue(queue);
uDebug("worker:%s, queue:%p is freed", pool->name, queue); uDebug("worker:%s, queue:%p is freed", pool->name, queue);
} }