Merge branch 'feature/update' into feature/update_test
This commit is contained in:
commit
ba4b483bc1
|
@ -20,9 +20,12 @@
|
|||
extern "C" {
|
||||
#endif
|
||||
|
||||
int32_t dnodeInitVnodeWrite();
|
||||
void dnodeCleanupVnodeWrite();
|
||||
void dnodeDispatchToVnodeWriteQueue(SRpcMsg *pMsg);
|
||||
int32_t dnodeInitVWrite();
|
||||
void dnodeCleanupVWrite();
|
||||
void dnodeDispatchToVWriteQueue(SRpcMsg *pMsg);
|
||||
void * dnodeAllocVWriteQueue(void *pVnode);
|
||||
void dnodeFreeVWriteQueue(void *wqueue);
|
||||
void dnodeSendRpcVWriteRsp(void *pVnode, void *param, int32_t code);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -62,7 +62,7 @@ static const SDnodeComponent tsDnodeComponents[] = {
|
|||
{"wal", walInit, walCleanUp},
|
||||
{"check", dnodeInitCheck, dnodeCleanupCheck}, // NOTES: dnodeInitCheck must be behind the dnodeinitStorage component !!!
|
||||
{"vread", dnodeInitVnodeRead, dnodeCleanupVnodeRead},
|
||||
{"vwrite", dnodeInitVnodeWrite, dnodeCleanupVnodeWrite},
|
||||
{"vwrite", dnodeInitVWrite, dnodeCleanupVWrite},
|
||||
{"mread", dnodeInitMnodeRead, dnodeCleanupMnodeRead},
|
||||
{"mwrite", dnodeInitMnodeWrite, dnodeCleanupMnodeWrite},
|
||||
{"mpeer", dnodeInitMnodePeer, dnodeCleanupMnodePeer},
|
||||
|
|
|
@ -38,10 +38,10 @@ static void *tsDnodeServerRpc = NULL;
|
|||
static void *tsDnodeClientRpc = NULL;
|
||||
|
||||
int32_t dnodeInitServer() {
|
||||
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CREATE_TABLE] = dnodeDispatchToVnodeWriteQueue;
|
||||
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_DROP_TABLE] = dnodeDispatchToVnodeWriteQueue;
|
||||
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE] = dnodeDispatchToVnodeWriteQueue;
|
||||
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_DROP_STABLE] = dnodeDispatchToVnodeWriteQueue;
|
||||
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CREATE_TABLE] = dnodeDispatchToVWriteQueue;
|
||||
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_DROP_TABLE] = dnodeDispatchToVWriteQueue;
|
||||
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE] = dnodeDispatchToVWriteQueue;
|
||||
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_DROP_STABLE] = dnodeDispatchToVWriteQueue;
|
||||
|
||||
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE] = dnodeDispatchToMgmtQueue;
|
||||
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE] = dnodeDispatchToMgmtQueue;
|
||||
|
|
|
@ -38,10 +38,10 @@ static int32_t tsDnodeQueryReqNum = 0;
|
|||
static int32_t tsDnodeSubmitReqNum = 0;
|
||||
|
||||
int32_t dnodeInitShell() {
|
||||
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_SUBMIT] = dnodeDispatchToVnodeWriteQueue;
|
||||
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_SUBMIT] = dnodeDispatchToVWriteQueue;
|
||||
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_QUERY] = dnodeDispatchToVnodeReadQueue;
|
||||
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_FETCH] = dnodeDispatchToVnodeReadQueue;
|
||||
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = dnodeDispatchToVnodeWriteQueue;
|
||||
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = dnodeDispatchToVWriteQueue;
|
||||
|
||||
// the following message shall be treated as mnode write
|
||||
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_ACCT] = dnodeDispatchToMnodeWriteQueue;
|
||||
|
|
|
@ -132,7 +132,7 @@ void dnodeDispatchToVnodeReadQueue(SRpcMsg *pMsg) {
|
|||
}
|
||||
}
|
||||
|
||||
void *dnodeAllocateVnodeRqueue(void *pVnode) {
|
||||
void *dnodeAllocVReadQueue(void *pVnode) {
|
||||
pthread_mutex_lock(&readPool.mutex);
|
||||
taos_queue queue = taosOpenQueue();
|
||||
if (queue == NULL) {
|
||||
|
@ -167,7 +167,7 @@ void *dnodeAllocateVnodeRqueue(void *pVnode) {
|
|||
return queue;
|
||||
}
|
||||
|
||||
void dnodeFreeVnodeRqueue(void *rqueue) {
|
||||
void dnodeFreeVReadQueue(void *rqueue) {
|
||||
taosCloseQueue(rqueue);
|
||||
|
||||
// dynamically adjust the number of threads
|
||||
|
|
|
@ -15,74 +15,65 @@
|
|||
|
||||
#define _DEFAULT_SOURCE
|
||||
#include "os.h"
|
||||
#include "taosmsg.h"
|
||||
#include "taoserror.h"
|
||||
#include "tutil.h"
|
||||
#include "tglobal.h"
|
||||
#include "tqueue.h"
|
||||
#include "trpc.h"
|
||||
#include "tsdb.h"
|
||||
#include "twal.h"
|
||||
#include "tdataformat.h"
|
||||
#include "tglobal.h"
|
||||
#include "tsync.h"
|
||||
#include "vnode.h"
|
||||
#include "dnodeInt.h"
|
||||
#include "syncInt.h"
|
||||
#include "dnodeVWrite.h"
|
||||
#include "dnodeMgmt.h"
|
||||
#include "dnodeInt.h"
|
||||
|
||||
typedef struct {
|
||||
taos_qall qall;
|
||||
taos_qset qset; // queue set
|
||||
pthread_t thread; // thread
|
||||
int32_t workerId; // worker ID
|
||||
taos_qall qall;
|
||||
taos_qset qset; // queue set
|
||||
int32_t workerId; // worker ID
|
||||
pthread_t thread; // thread
|
||||
} SWriteWorker;
|
||||
|
||||
typedef struct {
|
||||
SRspRet rspRet;
|
||||
int32_t processedCount;
|
||||
int32_t code;
|
||||
void *pCont;
|
||||
int32_t contLen;
|
||||
SRpcMsg rpcMsg;
|
||||
SRspRet rspRet;
|
||||
SRpcMsg rpcMsg;
|
||||
int32_t processedCount;
|
||||
int32_t code;
|
||||
int32_t contLen;
|
||||
void * pCont;
|
||||
} SWriteMsg;
|
||||
|
||||
typedef struct {
|
||||
int32_t max; // max number of workers
|
||||
int32_t nextId; // from 0 to max-1, cyclic
|
||||
SWriteWorker *writeWorker;
|
||||
int32_t max; // max number of workers
|
||||
int32_t nextId; // from 0 to max-1, cyclic
|
||||
SWriteWorker *worker;
|
||||
pthread_mutex_t mutex;
|
||||
} SWriteWorkerPool;
|
||||
|
||||
static SWriteWorkerPool tsVWriteWP;
|
||||
static void *dnodeProcessWriteQueue(void *param);
|
||||
static void dnodeHandleIdleWorker(SWriteWorker *pWorker);
|
||||
|
||||
SWriteWorkerPool wWorkerPool;
|
||||
int32_t dnodeInitVWrite() {
|
||||
tsVWriteWP.max = tsNumOfCores;
|
||||
tsVWriteWP.worker = (SWriteWorker *)tcalloc(sizeof(SWriteWorker), tsVWriteWP.max);
|
||||
if (tsVWriteWP.worker == NULL) return -1;
|
||||
pthread_mutex_init(&tsVWriteWP.mutex, NULL);
|
||||
|
||||
int32_t dnodeInitVnodeWrite() {
|
||||
wWorkerPool.max = tsNumOfCores;
|
||||
wWorkerPool.writeWorker = (SWriteWorker *)calloc(sizeof(SWriteWorker), wWorkerPool.max);
|
||||
if (wWorkerPool.writeWorker == NULL) return -1;
|
||||
pthread_mutex_init(&wWorkerPool.mutex, NULL);
|
||||
|
||||
for (int32_t i = 0; i < wWorkerPool.max; ++i) {
|
||||
wWorkerPool.writeWorker[i].workerId = i;
|
||||
for (int32_t i = 0; i < tsVWriteWP.max; ++i) {
|
||||
tsVWriteWP.worker[i].workerId = i;
|
||||
}
|
||||
|
||||
dInfo("dnode write is initialized, max worker %d", wWorkerPool.max);
|
||||
dInfo("dnode vwrite is initialized, max worker %d", tsVWriteWP.max);
|
||||
return 0;
|
||||
}
|
||||
|
||||
void dnodeCleanupVnodeWrite() {
|
||||
for (int32_t i = 0; i < wWorkerPool.max; ++i) {
|
||||
SWriteWorker *pWorker = wWorkerPool.writeWorker + i;
|
||||
void dnodeCleanupVWrite() {
|
||||
for (int32_t i = 0; i < tsVWriteWP.max; ++i) {
|
||||
SWriteWorker *pWorker = tsVWriteWP.worker + i;
|
||||
if (pWorker->thread) {
|
||||
taosQsetThreadResume(pWorker->qset);
|
||||
}
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < wWorkerPool.max; ++i) {
|
||||
SWriteWorker *pWorker = wWorkerPool.writeWorker + i;
|
||||
for (int32_t i = 0; i < tsVWriteWP.max; ++i) {
|
||||
SWriteWorker *pWorker = tsVWriteWP.worker + i;
|
||||
if (pWorker->thread) {
|
||||
pthread_join(pWorker->thread, NULL);
|
||||
taosFreeQall(pWorker->qall);
|
||||
|
@ -90,13 +81,13 @@ void dnodeCleanupVnodeWrite() {
|
|||
}
|
||||
}
|
||||
|
||||
pthread_mutex_destroy(&wWorkerPool.mutex);
|
||||
free(wWorkerPool.writeWorker);
|
||||
dInfo("dnode write is closed");
|
||||
pthread_mutex_destroy(&tsVWriteWP.mutex);
|
||||
tfree(tsVWriteWP.worker);
|
||||
dInfo("dnode vwrite is closed");
|
||||
}
|
||||
|
||||
void dnodeDispatchToVnodeWriteQueue(SRpcMsg *pMsg) {
|
||||
char *pCont = (char *)pMsg->pCont;
|
||||
void dnodeDispatchToVWriteQueue(SRpcMsg *pMsg) {
|
||||
char *pCont = pMsg->pCont;
|
||||
|
||||
if (pMsg->msgType == TSDB_MSG_TYPE_SUBMIT) {
|
||||
SMsgDesc *pDesc = (SMsgDesc *)pCont;
|
||||
|
@ -111,7 +102,7 @@ void dnodeDispatchToVnodeWriteQueue(SRpcMsg *pMsg) {
|
|||
taos_queue queue = vnodeAcquireWqueue(pHead->vgId);
|
||||
if (queue) {
|
||||
// put message into queue
|
||||
SWriteMsg *pWrite = (SWriteMsg *)taosAllocateQitem(sizeof(SWriteMsg));
|
||||
SWriteMsg *pWrite = taosAllocateQitem(sizeof(SWriteMsg));
|
||||
pWrite->rpcMsg = *pMsg;
|
||||
pWrite->pCont = pCont;
|
||||
pWrite->contLen = pHead->contLen;
|
||||
|
@ -130,12 +121,12 @@ void dnodeDispatchToVnodeWriteQueue(SRpcMsg *pMsg) {
|
|||
}
|
||||
}
|
||||
|
||||
void *dnodeAllocateVnodeWqueue(void *pVnode) {
|
||||
pthread_mutex_lock(&wWorkerPool.mutex);
|
||||
SWriteWorker *pWorker = wWorkerPool.writeWorker + wWorkerPool.nextId;
|
||||
void *dnodeAllocVWriteQueue(void *pVnode) {
|
||||
pthread_mutex_lock(&tsVWriteWP.mutex);
|
||||
SWriteWorker *pWorker = tsVWriteWP.worker + tsVWriteWP.nextId;
|
||||
void *queue = taosOpenQueue();
|
||||
if (queue == NULL) {
|
||||
pthread_mutex_unlock(&wWorkerPool.mutex);
|
||||
pthread_mutex_unlock(&tsVWriteWP.mutex);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
@ -143,7 +134,7 @@ void *dnodeAllocateVnodeWqueue(void *pVnode) {
|
|||
pWorker->qset = taosOpenQset();
|
||||
if (pWorker->qset == NULL) {
|
||||
taosCloseQueue(queue);
|
||||
pthread_mutex_unlock(&wWorkerPool.mutex);
|
||||
pthread_mutex_unlock(&tsVWriteWP.mutex);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
@ -152,7 +143,7 @@ void *dnodeAllocateVnodeWqueue(void *pVnode) {
|
|||
if (pWorker->qall == NULL) {
|
||||
taosCloseQset(pWorker->qset);
|
||||
taosCloseQueue(queue);
|
||||
pthread_mutex_unlock(&wWorkerPool.mutex);
|
||||
pthread_mutex_unlock(&tsVWriteWP.mutex);
|
||||
return NULL;
|
||||
}
|
||||
pthread_attr_t thAttr;
|
||||
|
@ -160,37 +151,35 @@ void *dnodeAllocateVnodeWqueue(void *pVnode) {
|
|||
pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
|
||||
|
||||
if (pthread_create(&pWorker->thread, &thAttr, dnodeProcessWriteQueue, pWorker) != 0) {
|
||||
dError("failed to create thread to process read queue, reason:%s", strerror(errno));
|
||||
dError("failed to create thread to process vwrite queue since %s", strerror(errno));
|
||||
taosFreeQall(pWorker->qall);
|
||||
taosCloseQset(pWorker->qset);
|
||||
taosCloseQueue(queue);
|
||||
queue = NULL;
|
||||
} else {
|
||||
dDebug("write worker:%d is launched", pWorker->workerId);
|
||||
wWorkerPool.nextId = (wWorkerPool.nextId + 1) % wWorkerPool.max;
|
||||
dDebug("dnode vwrite worker:%d is launched", pWorker->workerId);
|
||||
tsVWriteWP.nextId = (tsVWriteWP.nextId + 1) % tsVWriteWP.max;
|
||||
}
|
||||
|
||||
pthread_attr_destroy(&thAttr);
|
||||
} else {
|
||||
taosAddIntoQset(pWorker->qset, queue, pVnode);
|
||||
wWorkerPool.nextId = (wWorkerPool.nextId + 1) % wWorkerPool.max;
|
||||
tsVWriteWP.nextId = (tsVWriteWP.nextId + 1) % tsVWriteWP.max;
|
||||
}
|
||||
|
||||
pthread_mutex_unlock(&wWorkerPool.mutex);
|
||||
dDebug("pVnode:%p, write queue:%p is allocated", pVnode, queue);
|
||||
pthread_mutex_unlock(&tsVWriteWP.mutex);
|
||||
dDebug("pVnode:%p, dnode vwrite queue:%p is allocated", pVnode, queue);
|
||||
|
||||
return queue;
|
||||
}
|
||||
|
||||
void dnodeFreeVnodeWqueue(void *wqueue) {
|
||||
void dnodeFreeVWriteQueue(void *wqueue) {
|
||||
taosCloseQueue(wqueue);
|
||||
|
||||
// dynamically adjust the number of threads
|
||||
}
|
||||
|
||||
void dnodeSendRpcVnodeWriteRsp(void *pVnode, void *param, int32_t code) {
|
||||
SWriteMsg *pWrite = (SWriteMsg *)param;
|
||||
if (pWrite == NULL) return;
|
||||
void dnodeSendRpcVWriteRsp(void *pVnode, void *param, int32_t code) {
|
||||
if (param == NULL) return;
|
||||
SWriteMsg *pWrite = param;
|
||||
|
||||
if (code < 0) pWrite->code = code;
|
||||
int32_t count = atomic_add_fetch_32(&pWrite->processedCount, 1);
|
||||
|
@ -215,44 +204,45 @@ static void *dnodeProcessWriteQueue(void *param) {
|
|||
SWriteWorker *pWorker = (SWriteWorker *)param;
|
||||
SWriteMsg * pWrite;
|
||||
SWalHead * pHead;
|
||||
int32_t numOfMsgs;
|
||||
int type;
|
||||
void * pVnode, *item;
|
||||
SRspRet * pRspRet;
|
||||
void * pVnode;
|
||||
void * pItem;
|
||||
int32_t numOfMsgs;
|
||||
int32_t qtype;
|
||||
|
||||
dDebug("write worker:%d is running", pWorker->workerId);
|
||||
dDebug("dnode vwrite worker:%d is running", pWorker->workerId);
|
||||
|
||||
while (1) {
|
||||
numOfMsgs = taosReadAllQitemsFromQset(pWorker->qset, pWorker->qall, &pVnode);
|
||||
if (numOfMsgs == 0) {
|
||||
dDebug("qset:%p, dnode write got no message from qset, exiting", pWorker->qset);
|
||||
dDebug("qset:%p, dnode vwrite got no message from qset, exiting", pWorker->qset);
|
||||
break;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < numOfMsgs; ++i) {
|
||||
pWrite = NULL;
|
||||
pRspRet = NULL;
|
||||
taosGetQitem(pWorker->qall, &type, &item);
|
||||
if (type == TAOS_QTYPE_RPC) {
|
||||
pWrite = (SWriteMsg *)item;
|
||||
taosGetQitem(pWorker->qall, &qtype, &pItem);
|
||||
if (qtype == TAOS_QTYPE_RPC) {
|
||||
pWrite = pItem;
|
||||
pRspRet = &pWrite->rspRet;
|
||||
pHead = (SWalHead *)(pWrite->pCont - sizeof(SWalHead));
|
||||
pHead = (SWalHead *)((char *)pWrite->pCont - sizeof(SWalHead));
|
||||
pHead->msgType = pWrite->rpcMsg.msgType;
|
||||
pHead->version = 0;
|
||||
pHead->len = pWrite->contLen;
|
||||
dDebug("%p, rpc msg:%s will be processed in vwrite queue", pWrite->rpcMsg.ahandle,
|
||||
taosMsg[pWrite->rpcMsg.msgType]);
|
||||
} else if (type == TAOS_QTYPE_CQ) {
|
||||
pHead = (SWalHead *)((char*)item + sizeof(SSyncHead));
|
||||
} else if (qtype == TAOS_QTYPE_CQ) {
|
||||
pHead = (SWalHead *)((char *)pItem + sizeof(SSyncHead));
|
||||
dTrace("%p, CQ wal msg:%s will be processed in vwrite queue, version:%" PRIu64, pHead, taosMsg[pHead->msgType],
|
||||
pHead->version);
|
||||
} else {
|
||||
pHead = (SWalHead *)item;
|
||||
pHead = pItem;
|
||||
dTrace("%p, wal msg:%s will be processed in vwrite queue, version:%" PRIu64, pHead, taosMsg[pHead->msgType],
|
||||
pHead->version);
|
||||
}
|
||||
|
||||
int32_t code = vnodeProcessWrite(pVnode, type, pHead, pRspRet);
|
||||
int32_t code = vnodeProcessWrite(pVnode, qtype, pHead, pRspRet);
|
||||
dTrace("%p, msg:%s is processed in vwrite queue, version:%" PRIu64 ", result:%s", pHead, taosMsg[pHead->msgType],
|
||||
pHead->version, tstrerror(code));
|
||||
|
||||
|
@ -267,17 +257,17 @@ static void *dnodeProcessWriteQueue(void *param) {
|
|||
// browse all items, and process them one by one
|
||||
taosResetQitems(pWorker->qall);
|
||||
for (int32_t i = 0; i < numOfMsgs; ++i) {
|
||||
taosGetQitem(pWorker->qall, &type, &item);
|
||||
if (type == TAOS_QTYPE_RPC) {
|
||||
pWrite = (SWriteMsg *)item;
|
||||
dnodeSendRpcVnodeWriteRsp(pVnode, item, pWrite->rpcMsg.code);
|
||||
} else if (type == TAOS_QTYPE_FWD) {
|
||||
pHead = (SWalHead *)item;
|
||||
taosGetQitem(pWorker->qall, &qtype, &pItem);
|
||||
if (qtype == TAOS_QTYPE_RPC) {
|
||||
pWrite = pItem;
|
||||
dnodeSendRpcVWriteRsp(pVnode, pItem, pWrite->rpcMsg.code);
|
||||
} else if (qtype == TAOS_QTYPE_FWD) {
|
||||
pHead = pItem;
|
||||
vnodeConfirmForward(pVnode, pHead->version, 0);
|
||||
taosFreeQitem(item);
|
||||
taosFreeQitem(pItem);
|
||||
vnodeRelease(pVnode);
|
||||
} else {
|
||||
taosFreeQitem(item);
|
||||
taosFreeQitem(pItem);
|
||||
vnodeRelease(pVnode);
|
||||
}
|
||||
}
|
||||
|
@ -285,19 +275,3 @@ static void *dnodeProcessWriteQueue(void *param) {
|
|||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
UNUSED_FUNC
|
||||
static void dnodeHandleIdleWorker(SWriteWorker *pWorker) {
|
||||
int32_t num = taosGetQueueNumber(pWorker->qset);
|
||||
|
||||
if (num > 0) {
|
||||
usleep(30000);
|
||||
sched_yield();
|
||||
} else {
|
||||
taosFreeQall(pWorker->qall);
|
||||
taosCloseQset(pWorker->qset);
|
||||
pWorker->qset = NULL;
|
||||
dDebug("write worker:%d is released", pWorker->workerId);
|
||||
pthread_exit(NULL);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -53,11 +53,11 @@ void dnodeSendMsgToMnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp);
|
|||
void dnodeSendMsgToDnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp, SRpcEpSet *epSet);
|
||||
void *dnodeSendCfgTableToRecv(int32_t vgId, int32_t tid);
|
||||
|
||||
void *dnodeAllocateVnodeWqueue(void *pVnode);
|
||||
void dnodeFreeVnodeWqueue(void *queue);
|
||||
void *dnodeAllocateVnodeRqueue(void *pVnode);
|
||||
void dnodeFreeVnodeRqueue(void *rqueue);
|
||||
void dnodeSendRpcVnodeWriteRsp(void *pVnode, void *param, int32_t code);
|
||||
void *dnodeAllocVWriteQueue(void *pVnode);
|
||||
void dnodeFreeVWriteQueue(void *wqueue);
|
||||
void *dnodeAllocVReadQueue(void *pVnode);
|
||||
void dnodeFreeVReadQueue(void *rqueue);
|
||||
void dnodeSendRpcVWriteRsp(void *pVnode, void *param, int32_t code);
|
||||
|
||||
int32_t dnodeAllocateMnodePqueue();
|
||||
void dnodeFreeMnodePqueue();
|
||||
|
|
|
@ -22,14 +22,14 @@ extern "C" {
|
|||
|
||||
#ifndef TAOS_OS_FUNC_ALLOC
|
||||
#define tmalloc(size) malloc(size)
|
||||
#define tcalloc(size) calloc(1, size)
|
||||
#define tcalloc(nmemb, size) calloc(nmemb, size)
|
||||
#define trealloc(p, size) realloc(p, size)
|
||||
#define tmemalign(alignment, size) malloc(size)
|
||||
#define tfree(p) free(p)
|
||||
#define tmemzero(p, size) memset(p, 0, size)
|
||||
#else
|
||||
void *tmalloc(int32_t size);
|
||||
void *tcalloc(int32_t size);
|
||||
void *tcalloc(int32_t nmemb, int32_t size);
|
||||
void *trealloc(void *p, int32_t size);
|
||||
void *tmemalign(int32_t alignment, int32_t size);
|
||||
void tfree(void *p);
|
||||
|
|
|
@ -31,10 +31,14 @@ extern "C" {
|
|||
} \
|
||||
}
|
||||
|
||||
int64_t taosRead(int32_t fd, void *buf, int64_t count);
|
||||
int64_t taosWrite(int32_t fd, void *buf, int64_t count);
|
||||
int64_t taosLSeek(int32_t fd, int64_t offset, int32_t whence);
|
||||
int64_t taosReadImp(int32_t fd, void *buf, int64_t count);
|
||||
int64_t taosWriteImp(int32_t fd, void *buf, int64_t count);
|
||||
int64_t taosLSeekImp(int32_t fd, int64_t offset, int32_t whence);
|
||||
int32_t taosRenameFile(char *fullPath, char *suffix, char delimiter, char **dstPath);
|
||||
|
||||
#define taosRead(fd, buf, count) taosReadImp(fd, buf, count)
|
||||
#define taosWrite(fd, buf, count) taosWriteImp(fd, buf, count)
|
||||
#define taosLSeek(fd, offset, whence) taosLSeekImp(fd, offset, whence)
|
||||
#define taosClose(x) tclose(x)
|
||||
|
||||
// TAOS_OS_FUNC_FILE_SENDIFLE
|
||||
|
@ -42,12 +46,12 @@ int64_t taosSendFile(int32_t dfd, int32_t sfd, int64_t *offset, int64_t size);
|
|||
int64_t taosFSendFile(FILE *outfile, FILE *infile, int64_t *offset, int64_t size);
|
||||
|
||||
#ifdef TAOS_RANDOM_FILE_FAIL
|
||||
void taosSetRandomFileFailFactor(int factor);
|
||||
void taosSetRandomFileFailFactor(int32_t factor);
|
||||
void taosSetRandomFileFailOutput(const char *path);
|
||||
#ifdef TAOS_RANDOM_FILE_FAIL_TEST
|
||||
ssize_t taosReadFileRandomFail(int fd, void *buf, size_t count, const char *file, uint32_t line);
|
||||
ssize_t taosWriteFileRandomFail(int fd, void *buf, size_t count, const char *file, uint32_t line);
|
||||
off_t taosLSeekRandomFail(int fd, off_t offset, int whence, const char *file, uint32_t line);
|
||||
int64_t taosReadFileRandomFail(int32_t fd, void *buf, int32_t count, const char *file, uint32_t line);
|
||||
int64_t taosWriteFileRandomFail(int32_t fd, void *buf, int32_t count, const char *file, uint32_t line);
|
||||
int64_t taosLSeekRandomFail(int32_t fd, int64_t offset, int32_t whence, const char *file, uint32_t line);
|
||||
#undef taosRead
|
||||
#undef taosWrite
|
||||
#undef taosLSeek
|
||||
|
|
|
@ -42,10 +42,10 @@ extern "C" {
|
|||
|
||||
#ifdef TAOS_RANDOM_NETWORK_FAIL
|
||||
#ifdef TAOS_RANDOM_NETWORK_FAIL_TEST
|
||||
ssize_t taosSendRandomFail(int32_t sockfd, const void *buf, size_t len, int32_t flags);
|
||||
ssize_t taosSendToRandomFail(int32_t sockfd, const void *buf, size_t len, int32_t flags, const struct sockaddr *dest_addr, socklen_t addrlen);
|
||||
ssize_t taosReadSocketRandomFail(int32_t fd, void *buf, size_t count);
|
||||
ssize_t taosWriteSocketRandomFail(int32_t fd, const void *buf, size_t count);
|
||||
int64_t taosSendRandomFail(int32_t sockfd, const void *buf, size_t len, int32_t flags);
|
||||
int64_t taosSendToRandomFail(int32_t sockfd, const void *buf, size_t len, int32_t flags, const struct sockaddr *dest_addr, socklen_t addrlen);
|
||||
int64_t taosReadSocketRandomFail(int32_t fd, void *buf, size_t count);
|
||||
int64_t taosWriteSocketRandomFail(int32_t fd, const void *buf, size_t count);
|
||||
#undef taosSend
|
||||
#undef taosSendto
|
||||
#undef taosReadSocket
|
||||
|
|
|
@ -32,11 +32,11 @@ void *tmalloc(int32_t size) {
|
|||
return p;
|
||||
}
|
||||
|
||||
void *tcalloc(int32_t size) {
|
||||
void *p = calloc(1, size);
|
||||
void *tcalloc(int32_t nmemb, int32_t size) {
|
||||
void *p = calloc(nmemb, size);
|
||||
if (p == NULL) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
uError("failed to calloc memory, size:%d reason:%s", size, strerror(errno));
|
||||
uError("failed to calloc memory, nmemb:%d size:%d reason:%s", nmemb, size, strerror(errno));
|
||||
}
|
||||
|
||||
return p;
|
||||
|
|
|
@ -20,7 +20,7 @@
|
|||
|
||||
#ifdef TAOS_RANDOM_NETWORK_FAIL
|
||||
|
||||
ssize_t taosSendRandomFail(int32_t sockfd, const void *buf, size_t len, int32_t flags) {
|
||||
int64_t taosSendRandomFail(int32_t sockfd, const void *buf, size_t len, int32_t flags) {
|
||||
if (rand() % RANDOM_NETWORK_FAIL_FACTOR == 0) {
|
||||
errno = ECONNRESET;
|
||||
return -1;
|
||||
|
@ -29,8 +29,8 @@ ssize_t taosSendRandomFail(int32_t sockfd, const void *buf, size_t len, int32_t
|
|||
return send(sockfd, buf, len, flags);
|
||||
}
|
||||
|
||||
ssize_t taosSendToRandomFail(int32_t sockfd, const void *buf, size_t len, int32_t flags, const struct sockaddr *dest_addr,
|
||||
socklen_t addrlen) {
|
||||
int64_t taosSendToRandomFail(int32_t sockfd, const void *buf, size_t len, int32_t flags,
|
||||
const struct sockaddr *dest_addr, socklen_t addrlen) {
|
||||
if (rand() % RANDOM_NETWORK_FAIL_FACTOR == 0) {
|
||||
errno = ECONNRESET;
|
||||
return -1;
|
||||
|
@ -39,7 +39,7 @@ ssize_t taosSendToRandomFail(int32_t sockfd, const void *buf, size_t len, int32_
|
|||
return sendto(sockfd, buf, len, flags, dest_addr, addrlen);
|
||||
}
|
||||
|
||||
ssize_t taosReadSocketRandomFail(int32_t fd, void *buf, size_t count) {
|
||||
int64_t taosReadSocketRandomFail(int32_t fd, void *buf, size_t count) {
|
||||
if (rand() % RANDOM_NETWORK_FAIL_FACTOR == 0) {
|
||||
errno = ECONNRESET;
|
||||
return -1;
|
||||
|
@ -48,7 +48,7 @@ ssize_t taosReadSocketRandomFail(int32_t fd, void *buf, size_t count) {
|
|||
return read(fd, buf, count);
|
||||
}
|
||||
|
||||
ssize_t taosWriteSocketRandomFail(int32_t fd, const void *buf, size_t count) {
|
||||
int64_t taosWriteSocketRandomFail(int32_t fd, const void *buf, size_t count) {
|
||||
if (rand() % RANDOM_NETWORK_FAIL_FACTOR == 0) {
|
||||
errno = EINTR;
|
||||
return -1;
|
||||
|
@ -105,7 +105,7 @@ void taosSetRandomFileFailOutput(const char *path) {
|
|||
sigaction(SIGILL, &act, NULL);
|
||||
}
|
||||
|
||||
ssize_t taosReadFileRandomFail(int32_t fd, void *buf, size_t count, const char *file, uint32_t line) {
|
||||
int64_t taosReadFileRandomFail(int32_t fd, void *buf, int32_t count, const char *file, uint32_t line) {
|
||||
if (random_file_fail_factor > 0) {
|
||||
if (rand() % random_file_fail_factor == 0) {
|
||||
errno = EIO;
|
||||
|
@ -113,10 +113,10 @@ ssize_t taosReadFileRandomFail(int32_t fd, void *buf, size_t count, const char *
|
|||
}
|
||||
}
|
||||
|
||||
return taosRead(fd, buf, count);
|
||||
return taosReadImp(fd, buf, count);
|
||||
}
|
||||
|
||||
ssize_t taosWriteFileRandomFail(int32_t fd, void *buf, size_t count, const char *file, uint32_t line) {
|
||||
int64_t taosWriteFileRandomFail(int32_t fd, void *buf, int32_t count, const char *file, uint32_t line) {
|
||||
if (random_file_fail_factor > 0) {
|
||||
if (rand() % random_file_fail_factor == 0) {
|
||||
errno = EIO;
|
||||
|
@ -124,10 +124,10 @@ ssize_t taosWriteFileRandomFail(int32_t fd, void *buf, size_t count, const char
|
|||
}
|
||||
}
|
||||
|
||||
return taosWrite(fd, buf, count);
|
||||
return taosWriteImp(fd, buf, count);
|
||||
}
|
||||
|
||||
off_t taosLSeekRandomFail(int32_t fd, off_t offset, int32_t whence, const char *file, uint32_t line) {
|
||||
int64_t taosLSeekRandomFail(int32_t fd, int64_t offset, int32_t whence, const char *file, uint32_t line) {
|
||||
if (random_file_fail_factor > 0) {
|
||||
if (rand() % random_file_fail_factor == 0) {
|
||||
errno = EIO;
|
||||
|
@ -135,7 +135,7 @@ off_t taosLSeekRandomFail(int32_t fd, off_t offset, int32_t whence, const char *
|
|||
}
|
||||
}
|
||||
|
||||
return taosLSeek(fd, offset, whence);
|
||||
return taosLSeekImp(fd, offset, whence);
|
||||
}
|
||||
|
||||
#endif //TAOS_RANDOM_FILE_FAIL
|
||||
|
|
|
@ -71,7 +71,7 @@ int32_t taosRenameFile(char *fullPath, char *suffix, char delimiter, char **dstP
|
|||
return rename(fullPath, *dstPath);
|
||||
}
|
||||
|
||||
int64_t taosRead(int32_t fd, void *buf, int64_t count) {
|
||||
int64_t taosReadImp(int32_t fd, void *buf, int64_t count) {
|
||||
int64_t leftbytes = count;
|
||||
int64_t readbytes;
|
||||
char * tbuf = (char *)buf;
|
||||
|
@ -95,7 +95,7 @@ int64_t taosRead(int32_t fd, void *buf, int64_t count) {
|
|||
return count;
|
||||
}
|
||||
|
||||
int64_t taosWrite(int32_t fd, void *buf, int64_t n) {
|
||||
int64_t taosWriteImp(int32_t fd, void *buf, int64_t n) {
|
||||
int64_t nleft = n;
|
||||
int64_t nwritten = 0;
|
||||
char * tbuf = (char *)buf;
|
||||
|
@ -115,6 +115,10 @@ int64_t taosWrite(int32_t fd, void *buf, int64_t n) {
|
|||
return n;
|
||||
}
|
||||
|
||||
int64_t taosLSeekImp(int32_t fd, int64_t offset, int32_t whence) {
|
||||
return (int64_t)tlseek(fd, (long)offset, whence);
|
||||
}
|
||||
|
||||
#ifndef TAOS_OS_FUNC_FILE_SENDIFLE
|
||||
|
||||
int64_t taosSendFile(int32_t dfd, int32_t sfd, int64_t *offset, int64_t size) {
|
||||
|
|
|
@ -13,10 +13,8 @@
|
|||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
#define _DEFAULT_SOURCE
|
||||
#include <regex.h>
|
||||
|
||||
#define TAOS_RANDOM_FILE_FAIL_TEST
|
||||
|
||||
#include <regex.h>
|
||||
#include "os.h"
|
||||
#include "talgo.h"
|
||||
#include "tchecksum.h"
|
||||
|
|
|
@ -14,9 +14,7 @@
|
|||
*/
|
||||
|
||||
#define _DEFAULT_SOURCE
|
||||
|
||||
#define TAOS_RANDOM_FILE_FAIL_TEST
|
||||
|
||||
#include "os.h"
|
||||
#include "talgo.h"
|
||||
#include "tchecksum.h"
|
||||
|
|
|
@ -14,9 +14,7 @@
|
|||
*/
|
||||
|
||||
#define _DEFAULT_SOURCE
|
||||
|
||||
#define TAOS_RANDOM_FILE_FAIL_TEST
|
||||
|
||||
#include "os.h"
|
||||
#include "hash.h"
|
||||
#include "taoserror.h"
|
||||
|
|
|
@ -255,8 +255,8 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
|
|||
|
||||
pVnode->fversion = pVnode->version;
|
||||
|
||||
pVnode->wqueue = dnodeAllocateVnodeWqueue(pVnode);
|
||||
pVnode->rqueue = dnodeAllocateVnodeRqueue(pVnode);
|
||||
pVnode->wqueue = dnodeAllocVWriteQueue(pVnode);
|
||||
pVnode->rqueue = dnodeAllocVReadQueue(pVnode);
|
||||
if (pVnode->wqueue == NULL || pVnode->rqueue == NULL) {
|
||||
vnodeCleanUp(pVnode);
|
||||
return terrno;
|
||||
|
@ -322,7 +322,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
|
|||
syncInfo.getWalInfo = vnodeGetWalInfo;
|
||||
syncInfo.getFileInfo = vnodeGetFileInfo;
|
||||
syncInfo.writeToCache = vnodeWriteToQueue;
|
||||
syncInfo.confirmForward = dnodeSendRpcVnodeWriteRsp;
|
||||
syncInfo.confirmForward = dnodeSendRpcVWriteRsp;
|
||||
syncInfo.notifyRole = vnodeNotifyRole;
|
||||
syncInfo.notifyFlowCtrl = vnodeCtrlFlow;
|
||||
syncInfo.notifyFileSynced = vnodeNotifyFileSynced;
|
||||
|
@ -409,12 +409,12 @@ void vnodeRelease(void *pVnodeRaw) {
|
|||
}
|
||||
|
||||
if (pVnode->wqueue) {
|
||||
dnodeFreeVnodeWqueue(pVnode->wqueue);
|
||||
dnodeFreeVWriteQueue(pVnode->wqueue);
|
||||
pVnode->wqueue = NULL;
|
||||
}
|
||||
|
||||
if (pVnode->rqueue) {
|
||||
dnodeFreeVnodeRqueue(pVnode->rqueue);
|
||||
dnodeFreeVReadQueue(pVnode->rqueue);
|
||||
pVnode->rqueue = NULL;
|
||||
}
|
||||
|
||||
|
|
|
@ -55,7 +55,7 @@ void walCleanUp() {
|
|||
}
|
||||
|
||||
void *walOpen(char *path, SWalCfg *pCfg) {
|
||||
SWal *pWal = tcalloc(sizeof(SWal));
|
||||
SWal *pWal = tcalloc(1, sizeof(SWal));
|
||||
if (pWal == NULL) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return NULL;
|
||||
|
|
|
@ -14,6 +14,7 @@
|
|||
*/
|
||||
|
||||
#define _DEFAULT_SOURCE
|
||||
#define TAOS_RANDOM_FILE_FAIL_TEST
|
||||
#include "os.h"
|
||||
#include "taoserror.h"
|
||||
#include "tchecksum.h"
|
||||
|
|
Loading…
Reference in New Issue