commit
49da63bed2
|
@ -174,7 +174,7 @@ static void syncConnCallback(void *param, TAOS_RES *tres, int code) {
|
||||||
TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass, const char *auth, const char *db,
|
TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass, const char *auth, const char *db,
|
||||||
uint16_t port) {
|
uint16_t port) {
|
||||||
STscObj *pObj = NULL;
|
STscObj *pObj = NULL;
|
||||||
SSqlObj *pSql = taosConnectImpl(ip, user, pass, auth, db, port, syncConnCallback, NULL, &pObj);
|
SSqlObj *pSql = taosConnectImpl(ip, user, pass, auth, db, port, syncConnCallback, NULL, (void **)&pObj);
|
||||||
if (pSql != NULL) {
|
if (pSql != NULL) {
|
||||||
pSql->fp = syncConnCallback;
|
pSql->fp = syncConnCallback;
|
||||||
pSql->param = pSql;
|
pSql->param = pSql;
|
||||||
|
@ -245,11 +245,11 @@ static void asyncConnCallback(void *param, TAOS_RES *tres, int code) {
|
||||||
TAOS *taos_connect_a(char *ip, char *user, char *pass, char *db, uint16_t port, void (*fp)(void *, TAOS_RES *, int),
|
TAOS *taos_connect_a(char *ip, char *user, char *pass, char *db, uint16_t port, void (*fp)(void *, TAOS_RES *, int),
|
||||||
void *param, TAOS **taos) {
|
void *param, TAOS **taos) {
|
||||||
STscObj *pObj = NULL;
|
STscObj *pObj = NULL;
|
||||||
SSqlObj* pSql = taosConnectImpl(ip, user, pass, NULL, db, port, asyncConnCallback, param, &pObj);
|
SSqlObj *pSql = taosConnectImpl(ip, user, pass, NULL, db, port, asyncConnCallback, param, (void **)&pObj);
|
||||||
if (pSql == NULL) {
|
if (pSql == NULL) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (taos) *taos = pObj;
|
if (taos) *taos = pObj;
|
||||||
|
|
||||||
pSql->fetchFp = fp;
|
pSql->fetchFp = fp;
|
||||||
|
|
|
@ -22,13 +22,12 @@
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
typedef struct STscObj TAOS;
|
typedef void TAOS;
|
||||||
typedef struct STscStmt TAOS_STMT;
|
typedef void TAOS_STMT;
|
||||||
typedef struct SSqlObj TAOS_RES;
|
typedef void TAOS_RES;
|
||||||
typedef struct SSqlStream TAOS_STREAM;
|
typedef void TAOS_STREAM;
|
||||||
typedef struct SSub TAOS_SUB;
|
typedef void TAOS_SUB;
|
||||||
typedef unsigned char** TAOS_ROW;
|
typedef void **TAOS_ROW;
|
||||||
|
|
||||||
|
|
||||||
// Data type definition
|
// Data type definition
|
||||||
#define TSDB_DATA_TYPE_NULL 0 // 1 bytes
|
#define TSDB_DATA_TYPE_NULL 0 // 1 bytes
|
||||||
|
|
|
@ -56,6 +56,7 @@ void vnodeRelease(void *pVnode); // dec refCount
|
||||||
void* vnodeGetWal(void *pVnode);
|
void* vnodeGetWal(void *pVnode);
|
||||||
|
|
||||||
int32_t vnodeProcessWrite(void *pVnode, int qtype, void *pHead, void *item);
|
int32_t vnodeProcessWrite(void *pVnode, int qtype, void *pHead, void *item);
|
||||||
|
int32_t vnodeCheckWrite(void *pVnode);
|
||||||
int32_t vnodeGetVnodeList(int32_t vnodeList[], int32_t *numOfVnodes);
|
int32_t vnodeGetVnodeList(int32_t vnodeList[], int32_t *numOfVnodes);
|
||||||
void vnodeBuildStatusMsg(void *param);
|
void vnodeBuildStatusMsg(void *param);
|
||||||
void vnodeConfirmForward(void *param, uint64_t version, int32_t code);
|
void vnodeConfirmForward(void *param, uint64_t version, int32_t code);
|
||||||
|
@ -65,6 +66,7 @@ int32_t vnodeInitResources();
|
||||||
void vnodeCleanupResources();
|
void vnodeCleanupResources();
|
||||||
|
|
||||||
int32_t vnodeProcessRead(void *pVnode, SReadMsg *pReadMsg);
|
int32_t vnodeProcessRead(void *pVnode, SReadMsg *pReadMsg);
|
||||||
|
int32_t vnodeCheckRead(void *pVnode);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -5,14 +5,14 @@ INCLUDE_DIRECTORIES(inc)
|
||||||
AUX_SOURCE_DIRECTORY(src SRC)
|
AUX_SOURCE_DIRECTORY(src SRC)
|
||||||
|
|
||||||
IF (TD_LINUX)
|
IF (TD_LINUX)
|
||||||
LIST(REMOVE_ITEM SRC ./src/tarbitrator.c)
|
LIST(REMOVE_ITEM SRC src/tarbitrator.c)
|
||||||
ADD_LIBRARY(sync ${SRC})
|
ADD_LIBRARY(sync ${SRC})
|
||||||
TARGET_LINK_LIBRARIES(sync tutil pthread common)
|
TARGET_LINK_LIBRARIES(sync tutil pthread common)
|
||||||
|
|
||||||
LIST(APPEND BIN_SRC ./src/tarbitrator.c)
|
LIST(APPEND BIN_SRC src/tarbitrator.c)
|
||||||
LIST(APPEND BIN_SRC ./src/taosTcpPool.c)
|
LIST(APPEND BIN_SRC src/taosTcpPool.c)
|
||||||
ADD_EXECUTABLE(tarbitrator ${BIN_SRC})
|
ADD_EXECUTABLE(tarbitrator ${BIN_SRC})
|
||||||
TARGET_LINK_LIBRARIES(tarbitrator sync common osdetail tutil)
|
TARGET_LINK_LIBRARIES(tarbitrator sync common osdetail tutil)
|
||||||
|
|
||||||
ADD_SUBDIRECTORY(test)
|
#ADD_SUBDIRECTORY(test)
|
||||||
ENDIF ()
|
ENDIF ()
|
||||||
|
|
|
@ -465,9 +465,10 @@ void *vnodeAcquireRqueue(int32_t vgId) {
|
||||||
SVnodeObj *pVnode = vnodeAcquire(vgId);
|
SVnodeObj *pVnode = vnodeAcquire(vgId);
|
||||||
if (pVnode == NULL) return NULL;
|
if (pVnode == NULL) return NULL;
|
||||||
|
|
||||||
if (pVnode->status == TAOS_VN_STATUS_RESET) {
|
int32_t code = vnodeCheckRead(pVnode);
|
||||||
terrno = TSDB_CODE_APP_NOT_READY;
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
vInfo("vgId:%d, status is in reset", vgId);
|
terrno = code;
|
||||||
|
vInfo("vgId:%d, can not provide read service, status is %s", vgId, vnodeStatus[pVnode->status]);
|
||||||
vnodeRelease(pVnode);
|
vnodeRelease(pVnode);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
@ -479,13 +480,14 @@ void *vnodeAcquireWqueue(int32_t vgId) {
|
||||||
SVnodeObj *pVnode = vnodeAcquire(vgId);
|
SVnodeObj *pVnode = vnodeAcquire(vgId);
|
||||||
if (pVnode == NULL) return NULL;
|
if (pVnode == NULL) return NULL;
|
||||||
|
|
||||||
if (pVnode->status == TAOS_VN_STATUS_RESET) {
|
int32_t code = vnodeCheckWrite(pVnode);
|
||||||
terrno = TSDB_CODE_APP_NOT_READY;
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
vInfo("vgId:%d, status is in reset", vgId);
|
terrno = code;
|
||||||
|
vInfo("vgId:%d, can not provide write service, status is %s", vgId, vnodeStatus[pVnode->status]);
|
||||||
vnodeRelease(pVnode);
|
vnodeRelease(pVnode);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
return pVnode->wqueue;
|
return pVnode->wqueue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -39,7 +39,13 @@ void vnodeInitReadFp(void) {
|
||||||
vnodeProcessReadMsgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessFetchMsg;
|
vnodeProcessReadMsgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessFetchMsg;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t vnodeProcessReadImp(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
|
//
|
||||||
|
// After the fetch request enters the vnode queue, if the vnode cannot provide services, the process function are
|
||||||
|
// still required, or there will be a deadlock, so we don’t do any check here, but put the check codes before the
|
||||||
|
// request enters the queue
|
||||||
|
//
|
||||||
|
int32_t vnodeProcessRead(void *param, SReadMsg *pReadMsg) {
|
||||||
|
SVnodeObj *pVnode = (SVnodeObj *)param;
|
||||||
int msgType = pReadMsg->rpcMsg.msgType;
|
int msgType = pReadMsg->rpcMsg.msgType;
|
||||||
|
|
||||||
if (vnodeProcessReadMsgFp[msgType] == NULL) {
|
if (vnodeProcessReadMsgFp[msgType] == NULL) {
|
||||||
|
@ -47,53 +53,35 @@ static int32_t vnodeProcessReadImp(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
|
||||||
return TSDB_CODE_VND_MSG_NOT_PROCESSED;
|
return TSDB_CODE_VND_MSG_NOT_PROCESSED;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return (*vnodeProcessReadMsgFp[msgType])(pVnode, pReadMsg);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t vnodeCheckRead(void *param) {
|
||||||
|
SVnodeObj *pVnode = param;
|
||||||
if (pVnode->status != TAOS_VN_STATUS_READY) {
|
if (pVnode->status != TAOS_VN_STATUS_READY) {
|
||||||
vDebug("vgId:%d, msgType:%s not processed, vnode status is %s", pVnode->vgId, taosMsg[msgType],
|
vDebug("vgId:%d, vnode status is %s, recCount:%d pVnode:%p", pVnode->vgId, vnodeStatus[pVnode->status],
|
||||||
vnodeStatus[pVnode->status]);
|
pVnode->refCount, pVnode);
|
||||||
return TSDB_CODE_APP_NOT_READY;
|
return TSDB_CODE_APP_NOT_READY;
|
||||||
}
|
}
|
||||||
|
|
||||||
// tsdb may be in reset state
|
// tsdb may be in reset state
|
||||||
if (pVnode->tsdb == NULL) {
|
if (pVnode->tsdb == NULL) {
|
||||||
vDebug("vgId:%d, msgType:%s not processed, tsdb is null", pVnode->vgId, taosMsg[msgType]);
|
vDebug("vgId:%d, tsdb is null, recCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode);
|
||||||
return TSDB_CODE_APP_NOT_READY;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pVnode->status == TAOS_VN_STATUS_CLOSING) {
|
|
||||||
vDebug("vgId:%d, msgType:%s not processed, vstatus is %s", pVnode->vgId, taosMsg[msgType],
|
|
||||||
vnodeStatus[pVnode->status]);
|
|
||||||
return TSDB_CODE_APP_NOT_READY;
|
return TSDB_CODE_APP_NOT_READY;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pVnode->role != TAOS_SYNC_ROLE_SLAVE && pVnode->role != TAOS_SYNC_ROLE_MASTER) {
|
if (pVnode->role != TAOS_SYNC_ROLE_SLAVE && pVnode->role != TAOS_SYNC_ROLE_MASTER) {
|
||||||
vDebug("vgId:%d, msgType:%s not processed, replica:%d role:%s", pVnode->vgId, taosMsg[msgType],
|
vDebug("vgId:%d, replica:%d role:%s, recCount:%d pVnode:%p", pVnode->vgId, pVnode->syncCfg.replica,
|
||||||
pVnode->syncCfg.replica, syncRole[pVnode->role]);
|
syncRole[pVnode->role], pVnode->refCount, pVnode);
|
||||||
return TSDB_CODE_APP_NOT_READY;
|
return TSDB_CODE_APP_NOT_READY;
|
||||||
}
|
}
|
||||||
|
|
||||||
return (*vnodeProcessReadMsgFp[msgType])(pVnode, pReadMsg);
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
static int32_t vnodePutItemIntoReadQueue(SVnodeObj *pVnode, void **qhandle, void *ahandle) {
|
||||||
|
int32_t code = vnodeCheckRead(pVnode);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) return code;
|
||||||
|
|
||||||
int32_t vnodeProcessRead(void *param, SReadMsg *pRead) {
|
|
||||||
SVnodeObj *pVnode = (SVnodeObj *)param;
|
|
||||||
int32_t code = vnodeProcessReadImp(pVnode, pRead);
|
|
||||||
|
|
||||||
if (code == TSDB_CODE_APP_NOT_READY && pRead->rpcMsg.msgType == TSDB_MSG_TYPE_QUERY) {
|
|
||||||
// After the fetch request enters the vnode queue
|
|
||||||
// If the vnode cannot provide services, the following operations are still required
|
|
||||||
// Or, there will be a deadlock
|
|
||||||
void **qhandle = (void **)pRead->pCont;
|
|
||||||
vError("QInfo:%p msg:%p will be killed for vstatus is %s", *qhandle, pRead, vnodeStatus[pVnode->status]);
|
|
||||||
|
|
||||||
// qKillQuery(*qhandle);
|
|
||||||
// qReleaseQInfo(pVnode->qMgmt, (void **)&qhandle, true);
|
|
||||||
return TSDB_CODE_APP_NOT_READY;
|
|
||||||
} else {
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static void vnodePutItemIntoReadQueue(SVnodeObj *pVnode, void **qhandle, void *ahandle) {
|
|
||||||
SReadMsg *pRead = (SReadMsg *)taosAllocateQitem(sizeof(SReadMsg));
|
SReadMsg *pRead = (SReadMsg *)taosAllocateQitem(sizeof(SReadMsg));
|
||||||
pRead->rpcMsg.msgType = TSDB_MSG_TYPE_QUERY;
|
pRead->rpcMsg.msgType = TSDB_MSG_TYPE_QUERY;
|
||||||
pRead->pCont = qhandle;
|
pRead->pCont = qhandle;
|
||||||
|
@ -104,6 +92,8 @@ static void vnodePutItemIntoReadQueue(SVnodeObj *pVnode, void **qhandle, void *a
|
||||||
|
|
||||||
vDebug("QInfo:%p add to vread queue for exec query, msg:%p", *qhandle, pRead);
|
vDebug("QInfo:%p add to vread queue for exec query, msg:%p", *qhandle, pRead);
|
||||||
taosWriteQitem(pVnode->rqueue, TAOS_QTYPE_QUERY, pRead);
|
taosWriteQitem(pVnode->rqueue, TAOS_QTYPE_QUERY, pRead);
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -122,8 +112,13 @@ static int32_t vnodeDumpQueryResult(SRspRet *pRet, void *pVnode, void **handle,
|
||||||
if ((code = qDumpRetrieveResult(*handle, (SRetrieveTableRsp **)&pRet->rsp, &pRet->len, &continueExec)) == TSDB_CODE_SUCCESS) {
|
if ((code = qDumpRetrieveResult(*handle, (SRetrieveTableRsp **)&pRet->rsp, &pRet->len, &continueExec)) == TSDB_CODE_SUCCESS) {
|
||||||
if (continueExec) {
|
if (continueExec) {
|
||||||
*freeHandle = false;
|
*freeHandle = false;
|
||||||
vnodePutItemIntoReadQueue(pVnode, handle, ahandle);
|
code = vnodePutItemIntoReadQueue(pVnode, handle, ahandle);
|
||||||
pRet->qhandle = *handle;
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
*freeHandle = true;
|
||||||
|
return code;
|
||||||
|
} else {
|
||||||
|
pRet->qhandle = *handle;
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
*freeHandle = true;
|
*freeHandle = true;
|
||||||
vDebug("QInfo:%p exec completed, free handle:%d", *handle, *freeHandle);
|
vDebug("QInfo:%p exec completed, free handle:%d", *handle, *freeHandle);
|
||||||
|
@ -226,7 +221,12 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
|
||||||
|
|
||||||
if (handle != NULL) {
|
if (handle != NULL) {
|
||||||
vDebug("vgId:%d, QInfo:%p, dnode query msg disposed, create qhandle and returns to app", vgId, *handle);
|
vDebug("vgId:%d, QInfo:%p, dnode query msg disposed, create qhandle and returns to app", vgId, *handle);
|
||||||
vnodePutItemIntoReadQueue(pVnode, handle, pReadMsg->rpcMsg.ahandle);
|
code = vnodePutItemIntoReadQueue(pVnode, handle, pReadMsg->rpcMsg.ahandle);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
pRsp->code = code;
|
||||||
|
qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true);
|
||||||
|
return pRsp->code;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
assert(pCont != NULL);
|
assert(pCont != NULL);
|
||||||
|
|
|
@ -56,15 +56,6 @@ int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) {
|
||||||
return TSDB_CODE_VND_MSG_NOT_PROCESSED;
|
return TSDB_CODE_VND_MSG_NOT_PROCESSED;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!(pVnode->accessState & TSDB_VN_WRITE_ACCCESS)) {
|
|
||||||
vDebug("vgId:%d, msgType:%s not processed, no write auth", pVnode->vgId, taosMsg[pHead->msgType]);
|
|
||||||
return TSDB_CODE_VND_NO_WRITE_AUTH;
|
|
||||||
}
|
|
||||||
|
|
||||||
// tsdb may be in reset state
|
|
||||||
if (pVnode->tsdb == NULL) return TSDB_CODE_APP_NOT_READY;
|
|
||||||
if (pVnode->status == TAOS_VN_STATUS_CLOSING) return TSDB_CODE_APP_NOT_READY;
|
|
||||||
|
|
||||||
if (pHead->version == 0) { // from client or CQ
|
if (pHead->version == 0) { // from client or CQ
|
||||||
if (pVnode->status != TAOS_VN_STATUS_READY) {
|
if (pVnode->status != TAOS_VN_STATUS_READY) {
|
||||||
vDebug("vgId:%d, msgType:%s not processed, vnode status is %d", pVnode->vgId, taosMsg[pHead->msgType],
|
vDebug("vgId:%d, msgType:%s not processed, vnode status is %d", pVnode->vgId, taosMsg[pHead->msgType],
|
||||||
|
@ -105,6 +96,28 @@ int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) {
|
||||||
return syncCode;
|
return syncCode;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t vnodeCheckWrite(void *param) {
|
||||||
|
SVnodeObj *pVnode = param;
|
||||||
|
if (!(pVnode->accessState & TSDB_VN_WRITE_ACCCESS)) {
|
||||||
|
vDebug("vgId:%d, no write auth, recCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode);
|
||||||
|
return TSDB_CODE_VND_NO_WRITE_AUTH;
|
||||||
|
}
|
||||||
|
|
||||||
|
// tsdb may be in reset state
|
||||||
|
if (pVnode->tsdb == NULL) {
|
||||||
|
vDebug("vgId:%d, tsdb is null, recCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode);
|
||||||
|
return TSDB_CODE_APP_NOT_READY;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pVnode->status == TAOS_VN_STATUS_CLOSING) {
|
||||||
|
vDebug("vgId:%d, vnode status is %s, recCount:%d pVnode:%p", pVnode->vgId, vnodeStatus[pVnode->status],
|
||||||
|
pVnode->refCount, pVnode);
|
||||||
|
return TSDB_CODE_APP_NOT_READY;
|
||||||
|
}
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
void vnodeConfirmForward(void *param, uint64_t version, int32_t code) {
|
void vnodeConfirmForward(void *param, uint64_t version, int32_t code) {
|
||||||
SVnodeObj *pVnode = (SVnodeObj *)param;
|
SVnodeObj *pVnode = (SVnodeObj *)param;
|
||||||
syncConfirmForward(pVnode->sync, version, code);
|
syncConfirmForward(pVnode->sync, version, code);
|
||||||
|
|
Loading…
Reference in New Issue