Merge branch 'develop' into feature/2.0tsdb
This commit is contained in:
commit
cc8bade653
|
@ -5280,8 +5280,8 @@ int32_t doLocalQueryProcess(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql) {
|
||||||
int32_t tscCheckCreateDbParams(SSqlCmd* pCmd, SCMCreateDbMsg* pCreate) {
|
int32_t tscCheckCreateDbParams(SSqlCmd* pCmd, SCMCreateDbMsg* pCreate) {
|
||||||
char msg[512] = {0};
|
char msg[512] = {0};
|
||||||
|
|
||||||
if (pCreate->commitLog != -1 && (pCreate->commitLog < 0 || pCreate->commitLog > 1)) {
|
if (pCreate->commitLog != -1 && (pCreate->commitLog < 0 || pCreate->commitLog > 2)) {
|
||||||
snprintf(msg, tListLen(msg), "invalid db option commitLog: %d, only 0 or 1 allowed", pCreate->commitLog);
|
snprintf(msg, tListLen(msg), "invalid db option commitLog: %d, only 0-2 allowed", pCreate->commitLog);
|
||||||
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg);
|
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1512,6 +1512,9 @@ static void multiVnodeInsertMerge(void* param, TAOS_RES* tres, int numOfRows) {
|
||||||
|
|
||||||
tscTrace("%p Async insertion completed, total inserted:%d", pParentObj, pParentObj->res.numOfRows);
|
tscTrace("%p Async insertion completed, total inserted:%d", pParentObj, pParentObj->res.numOfRows);
|
||||||
|
|
||||||
|
tfree(pState);
|
||||||
|
tfree(pSupporter);
|
||||||
|
|
||||||
// release data block data
|
// release data block data
|
||||||
pParentCmd->pDataBlocks = tscDestroyBlockArrayList(pParentCmd->pDataBlocks);
|
pParentCmd->pDataBlocks = tscDestroyBlockArrayList(pParentCmd->pDataBlocks);
|
||||||
|
|
||||||
|
|
|
@ -587,7 +587,7 @@ int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock) {
|
||||||
assert(pCmd->numOfClause == 1);
|
assert(pCmd->numOfClause == 1);
|
||||||
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
|
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
|
||||||
|
|
||||||
// set the correct metermeta object, the metermeta has been locked in pDataBlocks, so it must be in the cache
|
// set the correct table meta object, the table meta has been locked in pDataBlocks, so it must be in the cache
|
||||||
if (pTableMetaInfo->pTableMeta != pDataBlock->pTableMeta) {
|
if (pTableMetaInfo->pTableMeta != pDataBlock->pTableMeta) {
|
||||||
strcpy(pTableMetaInfo->name, pDataBlock->tableId);
|
strcpy(pTableMetaInfo->name, pDataBlock->tableId);
|
||||||
taosCacheRelease(tscCacheHandle, (void**)&(pTableMetaInfo->pTableMeta), false);
|
taosCacheRelease(tscCacheHandle, (void**)&(pTableMetaInfo->pTableMeta), false);
|
||||||
|
@ -599,7 +599,7 @@ int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock) {
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* the submit message consists of : [RPC header|message body|digest]
|
* the submit message consists of : [RPC header|message body|digest]
|
||||||
* the dataBlock only includes the RPC Header buffer and actual submit messsage body, space for digest needs
|
* the dataBlock only includes the RPC Header buffer and actual submit message body, space for digest needs
|
||||||
* additional space.
|
* additional space.
|
||||||
*/
|
*/
|
||||||
int ret = tscAllocPayload(pCmd, pDataBlock->nAllocSize + 100);
|
int ret = tscAllocPayload(pCmd, pDataBlock->nAllocSize + 100);
|
||||||
|
@ -1277,7 +1277,7 @@ void tscSqlExprInfoDestroy(SSqlExprInfo* pExprInfo) {
|
||||||
|
|
||||||
|
|
||||||
void tscSqlExprCopy(SSqlExprInfo* dst, const SSqlExprInfo* src, uint64_t tableuid, bool deepcopy) {
|
void tscSqlExprCopy(SSqlExprInfo* dst, const SSqlExprInfo* src, uint64_t tableuid, bool deepcopy) {
|
||||||
if (src == NULL) {
|
if (src == NULL || src->numOfExprs == 0) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1983,22 +1983,24 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
memcpy(&pNew->cmd, pCmd, sizeof(SSqlCmd));
|
SSqlCmd* pnCmd = &pNew->cmd;
|
||||||
|
memcpy(pnCmd, pCmd, sizeof(SSqlCmd));
|
||||||
|
|
||||||
pNew->cmd.command = cmd;
|
pnCmd->command = cmd;
|
||||||
pNew->cmd.payload = NULL;
|
pnCmd->payload = NULL;
|
||||||
pNew->cmd.allocSize = 0;
|
pnCmd->allocSize = 0;
|
||||||
|
|
||||||
pNew->cmd.pQueryInfo = NULL;
|
pnCmd->pQueryInfo = NULL;
|
||||||
pNew->cmd.numOfClause = 0;
|
pnCmd->numOfClause = 0;
|
||||||
pNew->cmd.clauseIndex = 0;
|
pnCmd->clauseIndex = 0;
|
||||||
|
pnCmd->pDataBlocks = NULL;
|
||||||
|
|
||||||
if (tscAddSubqueryInfo(&pNew->cmd) != TSDB_CODE_SUCCESS) {
|
if (tscAddSubqueryInfo(pnCmd) != TSDB_CODE_SUCCESS) {
|
||||||
tscFreeSqlObj(pNew);
|
tscFreeSqlObj(pNew);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
SQueryInfo* pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
|
SQueryInfo* pNewQueryInfo = tscGetQueryInfoDetail(pnCmd, 0);
|
||||||
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
|
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
|
||||||
|
|
||||||
memcpy(pNewQueryInfo, pQueryInfo, sizeof(SQueryInfo));
|
memcpy(pNewQueryInfo, pQueryInfo, sizeof(SQueryInfo));
|
||||||
|
@ -2018,7 +2020,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
|
||||||
memcpy(pNewQueryInfo->defaultVal, pQueryInfo->defaultVal, pQueryInfo->fieldsInfo.numOfOutputCols * sizeof(int64_t));
|
memcpy(pNewQueryInfo->defaultVal, pQueryInfo->defaultVal, pQueryInfo->fieldsInfo.numOfOutputCols * sizeof(int64_t));
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tscAllocPayload(&pNew->cmd, TSDB_DEFAULT_PAYLOAD_SIZE) != TSDB_CODE_SUCCESS) {
|
if (tscAllocPayload(pnCmd, TSDB_DEFAULT_PAYLOAD_SIZE) != TSDB_CODE_SUCCESS) {
|
||||||
tscError("%p new subquery failed, tableIndex:%d, vnodeIndex:%d", pSql, tableIndex, pTableMetaInfo->vnodeIndex);
|
tscError("%p new subquery failed, tableIndex:%d, vnodeIndex:%d", pSql, tableIndex, pTableMetaInfo->vnodeIndex);
|
||||||
tscFreeSqlObj(pNew);
|
tscFreeSqlObj(pNew);
|
||||||
return NULL;
|
return NULL;
|
||||||
|
|
|
@ -32,6 +32,7 @@
|
||||||
#include "vnode.h"
|
#include "vnode.h"
|
||||||
|
|
||||||
static int32_t dnodeOpenVnodes();
|
static int32_t dnodeOpenVnodes();
|
||||||
|
static void dnodeCloseVnodes();
|
||||||
static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *pMsg);
|
static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *pMsg);
|
||||||
static int32_t dnodeProcessDropVnodeMsg(SRpcMsg *pMsg);
|
static int32_t dnodeProcessDropVnodeMsg(SRpcMsg *pMsg);
|
||||||
static int32_t dnodeProcessAlterVnodeMsg(SRpcMsg *pMsg);
|
static int32_t dnodeProcessAlterVnodeMsg(SRpcMsg *pMsg);
|
||||||
|
@ -64,10 +65,6 @@ int32_t dnodeInitMgmt() {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if ( vnodeInitModule() != TSDB_CODE_SUCCESS) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t code = dnodeOpenVnodes();
|
int32_t code = dnodeOpenVnodes();
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -88,7 +85,7 @@ void dnodeCleanupMgmt() {
|
||||||
tsDnodeTmr = NULL;
|
tsDnodeTmr = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
vnodeCleanupModule();
|
dnodeCloseVnodes();
|
||||||
}
|
}
|
||||||
|
|
||||||
void dnodeMgmt(SRpcMsg *pMsg) {
|
void dnodeMgmt(SRpcMsg *pMsg) {
|
||||||
|
@ -107,7 +104,7 @@ void dnodeMgmt(SRpcMsg *pMsg) {
|
||||||
rpcFreeCont(pMsg->pCont);
|
rpcFreeCont(pMsg->pCont);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t dnodeOpenVnodes() {
|
static int dnodeGetVnodeList(int32_t vnodeList[]) {
|
||||||
DIR *dir = opendir(tsVnodeDir);
|
DIR *dir = opendir(tsVnodeDir);
|
||||||
if (dir == NULL) {
|
if (dir == NULL) {
|
||||||
return TSDB_CODE_NO_WRITE_ACCESS;
|
return TSDB_CODE_NO_WRITE_ACCESS;
|
||||||
|
@ -122,26 +119,51 @@ static int32_t dnodeOpenVnodes() {
|
||||||
int32_t vnode = atoi(de->d_name + 5);
|
int32_t vnode = atoi(de->d_name + 5);
|
||||||
if (vnode == 0) continue;
|
if (vnode == 0) continue;
|
||||||
|
|
||||||
char vnodeDir[TSDB_FILENAME_LEN * 3];
|
vnodeList[numOfVnodes] = vnode;
|
||||||
snprintf(vnodeDir, TSDB_FILENAME_LEN * 3, "%s/%s", tsVnodeDir, de->d_name);
|
|
||||||
int32_t code = vnodeOpen(vnode, vnodeDir);
|
|
||||||
if (code == 0) {
|
|
||||||
numOfVnodes++;
|
numOfVnodes++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
closedir(dir);
|
closedir(dir);
|
||||||
|
|
||||||
dPrint("dnode mgmt is opened, vnodes:%d", numOfVnodes);
|
return numOfVnodes;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t dnodeOpenVnodes() {
|
||||||
|
char vnodeDir[TSDB_FILENAME_LEN * 3];
|
||||||
|
int failed = 0;
|
||||||
|
|
||||||
|
int32_t *vnodeList = (int32_t *) malloc(sizeof(int32_t) * 10000);
|
||||||
|
int numOfVnodes = dnodeGetVnodeList(vnodeList);
|
||||||
|
|
||||||
|
for (int i=0; i<numOfVnodes; ++i) {
|
||||||
|
snprintf(vnodeDir, TSDB_FILENAME_LEN * 3, "%s/vnode%d", tsVnodeDir, vnodeList[i]);
|
||||||
|
if (vnodeOpen(vnodeList[i], vnodeDir) <0) failed++;
|
||||||
|
}
|
||||||
|
|
||||||
|
free(vnodeList);
|
||||||
|
|
||||||
|
dPrint("there are total vnodes:%d, failed to open:%d", numOfVnodes, failed);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void dnodeCloseVnodes() {
|
||||||
|
int32_t *vnodeList = (int32_t *) malloc(sizeof(int32_t) * 10000);
|
||||||
|
int numOfVnodes = dnodeGetVnodeList(vnodeList);
|
||||||
|
|
||||||
|
for (int i=0; i<numOfVnodes; ++i)
|
||||||
|
vnodeClose(vnodeList[i]);
|
||||||
|
|
||||||
|
free(vnodeList);
|
||||||
|
dPrint("total vnodes:%d are all closed", numOfVnodes);
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *rpcMsg) {
|
static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *rpcMsg) {
|
||||||
|
|
||||||
SMDCreateVnodeMsg *pCreate = rpcMsg->pCont;
|
SMDCreateVnodeMsg *pCreate = rpcMsg->pCont;
|
||||||
pCreate->cfg.vgId = htonl(pCreate->cfg.vgId);
|
pCreate->cfg.vgId = htonl(pCreate->cfg.vgId);
|
||||||
pCreate->cfg.maxSessions = htonl(pCreate->cfg.maxSessions);
|
pCreate->cfg.maxSessions = htonl(pCreate->cfg.maxSessions);
|
||||||
pCreate->cfg.daysPerFile = htonl(pCreate->cfg.daysPerFile);
|
pCreate->cfg.daysPerFile = htonl(pCreate->cfg.daysPerFile);
|
||||||
|
pCreate->cfg.commitLog = pCreate->cfg.commitLog;
|
||||||
|
|
||||||
return vnodeCreate(pCreate);
|
return vnodeCreate(pCreate);
|
||||||
}
|
}
|
||||||
|
|
|
@ -291,7 +291,7 @@ static void dnodeProcessRetrieveMsg(void *pVnode, SReadMsg *pMsg) {
|
||||||
dnodeContinueExecuteQuery(pVnode, pQInfo, pMsg);
|
dnodeContinueExecuteQuery(pVnode, pQInfo, pMsg);
|
||||||
} else { // no further execution invoked, release the ref to vnode
|
} else { // no further execution invoked, release the ref to vnode
|
||||||
dnodeProcessReadResult(pVnode, pMsg);
|
dnodeProcessReadResult(pVnode, pMsg);
|
||||||
vnodeRelease(pVnode);
|
//vnodeRelease(pVnode);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -40,7 +40,7 @@ typedef struct {
|
||||||
SRpcMsg rpcMsg;
|
SRpcMsg rpcMsg;
|
||||||
} SWriteMsg;
|
} SWriteMsg;
|
||||||
|
|
||||||
typedef struct _thread_obj {
|
typedef struct _wworker_pool {
|
||||||
int32_t max; // max number of workers
|
int32_t max; // max number of workers
|
||||||
int32_t nextId; // from 0 to max-1, cyclic
|
int32_t nextId; // from 0 to max-1, cyclic
|
||||||
SWriteWorker *writeWorker;
|
SWriteWorker *writeWorker;
|
||||||
|
|
|
@ -25,13 +25,10 @@ typedef struct {
|
||||||
void *rsp;
|
void *rsp;
|
||||||
} SRspRet;
|
} SRspRet;
|
||||||
|
|
||||||
int32_t vnodeInitModule();
|
|
||||||
void vnodeCleanupModule();
|
|
||||||
|
|
||||||
int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg);
|
int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg);
|
||||||
int32_t vnodeDrop(int32_t vgId);
|
int32_t vnodeDrop(int32_t vgId);
|
||||||
int32_t vnodeOpen(int32_t vnode, char *rootDir);
|
int32_t vnodeOpen(int32_t vgId, char *rootDir);
|
||||||
int32_t vnodeClose(void *pVnode);
|
int32_t vnodeClose(int32_t vgId);
|
||||||
|
|
||||||
void vnodeRelease(void *pVnode);
|
void vnodeRelease(void *pVnode);
|
||||||
void* vnodeGetVnode(int32_t vgId);
|
void* vnodeGetVnode(int32_t vgId);
|
||||||
|
|
|
@ -166,8 +166,8 @@ SDbObj *mgmtGetDbByTableId(char *tableId) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mgmtCheckDBParams(SCMCreateDbMsg *pCreate) {
|
static int32_t mgmtCheckDBParams(SCMCreateDbMsg *pCreate) {
|
||||||
if (pCreate->commitLog < 0 || pCreate->commitLog > 1) {
|
if (pCreate->commitLog < 0 || pCreate->commitLog > 2) {
|
||||||
mError("invalid db option commitLog: %d, only 0 or 1 allowed", pCreate->commitLog);
|
mError("invalid db option commitLog: %d, only 0-2 allowed", pCreate->commitLog);
|
||||||
return TSDB_CODE_INVALID_OPTION;
|
return TSDB_CODE_INVALID_OPTION;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -490,6 +490,7 @@ SMDCreateVnodeMsg *mgmtBuildCreateVnodeMsg(SVgObj *pVgroup) {
|
||||||
pCfg->daysToKeep2 = htonl(pCfg->daysToKeep2);
|
pCfg->daysToKeep2 = htonl(pCfg->daysToKeep2);
|
||||||
pCfg->daysToKeep = htonl(pCfg->daysToKeep);
|
pCfg->daysToKeep = htonl(pCfg->daysToKeep);
|
||||||
pCfg->commitTime = htonl(pCfg->commitTime);
|
pCfg->commitTime = htonl(pCfg->commitTime);
|
||||||
|
pCfg->commitLog = pCfg->commitLog;
|
||||||
pCfg->blocksPerTable = htons(pCfg->blocksPerTable);
|
pCfg->blocksPerTable = htons(pCfg->blocksPerTable);
|
||||||
pCfg->replications = (char) pVgroup->numOfVnodes;
|
pCfg->replications = (char) pVgroup->numOfVnodes;
|
||||||
pCfg->rowsInFileBlock = htonl(pCfg->rowsInFileBlock);
|
pCfg->rowsInFileBlock = htonl(pCfg->rowsInFileBlock);
|
||||||
|
|
|
@ -24,7 +24,6 @@
|
||||||
#include "lz4.h"
|
#include "lz4.h"
|
||||||
#include "taoserror.h"
|
#include "taoserror.h"
|
||||||
#include "tsocket.h"
|
#include "tsocket.h"
|
||||||
#include "shash.h"
|
|
||||||
#include "taosmsg.h"
|
#include "taosmsg.h"
|
||||||
#include "rpcUdp.h"
|
#include "rpcUdp.h"
|
||||||
#include "rpcCache.h"
|
#include "rpcCache.h"
|
||||||
|
@ -263,7 +262,6 @@ void *rpcOpen(SRpcInit *pInit) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pRpc->connType == TAOS_CONN_SERVER) {
|
if (pRpc->connType == TAOS_CONN_SERVER) {
|
||||||
// pRpc->hash = taosInitStrHash(pRpc->sessions, sizeof(pRpc), taosHashString);
|
|
||||||
pRpc->hash = taosHashInit(pRpc->sessions, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true);
|
pRpc->hash = taosHashInit(pRpc->sessions, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true);
|
||||||
if (pRpc->hash == NULL) {
|
if (pRpc->hash == NULL) {
|
||||||
tError("%s failed to init string hash", pRpc->label);
|
tError("%s failed to init string hash", pRpc->label);
|
||||||
|
@ -298,7 +296,6 @@ void rpcClose(void *param) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// taosCleanUpStrHash(pRpc->hash);
|
|
||||||
taosHashCleanup(pRpc->hash);
|
taosHashCleanup(pRpc->hash);
|
||||||
taosTmrCleanUp(pRpc->tmrCtrl);
|
taosTmrCleanUp(pRpc->tmrCtrl);
|
||||||
taosIdPoolCleanUp(pRpc->idPool);
|
taosIdPoolCleanUp(pRpc->idPool);
|
||||||
|
@ -535,8 +532,7 @@ static void rpcCloseConn(void *thandle) {
|
||||||
if ( pRpc->connType == TAOS_CONN_SERVER) {
|
if ( pRpc->connType == TAOS_CONN_SERVER) {
|
||||||
char hashstr[40] = {0};
|
char hashstr[40] = {0};
|
||||||
size_t size = sprintf(hashstr, "%x:%x:%x:%d", pConn->peerIp, pConn->linkUid, pConn->peerId, pConn->connType);
|
size_t size = sprintf(hashstr, "%x:%x:%x:%d", pConn->peerIp, pConn->linkUid, pConn->peerId, pConn->connType);
|
||||||
// taosDeleteStrHash(pRpc->hash, hashstr);
|
taosHashRemove(pRpc->hash, hashstr, size);
|
||||||
// taosHashRemove(pRpc->hash, hashstr, size);
|
|
||||||
|
|
||||||
rpcFreeMsg(pConn->pRspMsg); // it may have a response msg saved, but not request msg
|
rpcFreeMsg(pConn->pRspMsg); // it may have a response msg saved, but not request msg
|
||||||
pConn->pRspMsg = NULL;
|
pConn->pRspMsg = NULL;
|
||||||
|
@ -588,7 +584,6 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv) {
|
||||||
size_t size = sprintf(hashstr, "%x:%x:%x:%d", pRecv->ip, pHead->linkUid, pHead->sourceId, pRecv->connType);
|
size_t size = sprintf(hashstr, "%x:%x:%x:%d", pRecv->ip, pHead->linkUid, pHead->sourceId, pRecv->connType);
|
||||||
|
|
||||||
// check if it is already allocated
|
// check if it is already allocated
|
||||||
// SRpcConn **ppConn = (SRpcConn **)(taosGetStrHashData(pRpc->hash, hashstr));
|
|
||||||
SRpcConn **ppConn = (SRpcConn **)(taosHashGet(pRpc->hash, hashstr, size));
|
SRpcConn **ppConn = (SRpcConn **)(taosHashGet(pRpc->hash, hashstr, size));
|
||||||
if (ppConn) pConn = *ppConn;
|
if (ppConn) pConn = *ppConn;
|
||||||
if (pConn) return pConn;
|
if (pConn) return pConn;
|
||||||
|
@ -621,7 +616,6 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv) {
|
||||||
pConn->localPort = (pRpc->localPort + pRpc->index);
|
pConn->localPort = (pRpc->localPort + pRpc->index);
|
||||||
}
|
}
|
||||||
|
|
||||||
// taosAddStrHash(pRpc->hash, hashstr, (char *)&pConn);
|
|
||||||
taosHashPut(pRpc->hash, hashstr, size, (char *)&pConn, POINTER_BYTES);
|
taosHashPut(pRpc->hash, hashstr, size, (char *)&pConn, POINTER_BYTES);
|
||||||
|
|
||||||
tTrace("%s %p, rpc connection is allocated, sid:%d id:%s port:%u",
|
tTrace("%s %p, rpc connection is allocated, sid:%d id:%s port:%u",
|
||||||
|
@ -834,13 +828,15 @@ static void rpcProcessBrokenLink(SRpcConn *pConn) {
|
||||||
if (pConn->inType) {
|
if (pConn->inType) {
|
||||||
// if there are pending request, notify the app
|
// if there are pending request, notify the app
|
||||||
tTrace("%s %p, connection is gone, notify the app", pRpc->label, pConn);
|
tTrace("%s %p, connection is gone, notify the app", pRpc->label, pConn);
|
||||||
|
/*
|
||||||
SRpcMsg rpcMsg;
|
SRpcMsg rpcMsg;
|
||||||
rpcMsg.pCont = NULL;
|
rpcMsg.pCont = NULL;
|
||||||
rpcMsg.contLen = 0;
|
rpcMsg.contLen = 0;
|
||||||
rpcMsg.handle = pConn;
|
rpcMsg.handle = pConn;
|
||||||
rpcMsg.msgType = pConn->inType;
|
rpcMsg.msgType = pConn->inType;
|
||||||
rpcMsg.code = TSDB_CODE_NETWORK_UNAVAIL;
|
rpcMsg.code = TSDB_CODE_NETWORK_UNAVAIL;
|
||||||
// (*(pRpc->cfp))(&rpcMsg);
|
(*(pRpc->cfp))(&rpcMsg);
|
||||||
|
*/
|
||||||
}
|
}
|
||||||
|
|
||||||
rpcCloseConn(pConn);
|
rpcCloseConn(pConn);
|
||||||
|
@ -1163,13 +1159,15 @@ static void rpcProcessIdleTimer(void *param, void *tmrId) {
|
||||||
if (pConn->inType && pRpc->cfp) {
|
if (pConn->inType && pRpc->cfp) {
|
||||||
// if there are pending request, notify the app
|
// if there are pending request, notify the app
|
||||||
tTrace("%s %p, notify the app, connection is gone", pRpc->label, pConn);
|
tTrace("%s %p, notify the app, connection is gone", pRpc->label, pConn);
|
||||||
|
/*
|
||||||
SRpcMsg rpcMsg;
|
SRpcMsg rpcMsg;
|
||||||
rpcMsg.pCont = NULL;
|
rpcMsg.pCont = NULL;
|
||||||
rpcMsg.contLen = 0;
|
rpcMsg.contLen = 0;
|
||||||
rpcMsg.handle = pConn;
|
rpcMsg.handle = pConn;
|
||||||
rpcMsg.msgType = pConn->inType;
|
rpcMsg.msgType = pConn->inType;
|
||||||
rpcMsg.code = TSDB_CODE_NETWORK_UNAVAIL;
|
rpcMsg.code = TSDB_CODE_NETWORK_UNAVAIL;
|
||||||
// (*(pRpc->cfp))(&rpcMsg);
|
(*(pRpc->cfp))(&rpcMsg);
|
||||||
|
*/
|
||||||
}
|
}
|
||||||
rpcCloseConn(pConn);
|
rpcCloseConn(pConn);
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -632,7 +632,7 @@ static void doInitGlobalConfig() {
|
||||||
|
|
||||||
tsInitConfigOption(cfg++, "clog", &tsCommitLog, TSDB_CFG_VTYPE_SHORT,
|
tsInitConfigOption(cfg++, "clog", &tsCommitLog, TSDB_CFG_VTYPE_SHORT,
|
||||||
TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW,
|
TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW,
|
||||||
0, 1, 0, TSDB_CFG_UTYPE_NONE);
|
0, 2, 0, TSDB_CFG_UTYPE_NONE);
|
||||||
tsInitConfigOption(cfg++, "comp", &tsCompression, TSDB_CFG_VTYPE_SHORT,
|
tsInitConfigOption(cfg++, "comp", &tsCompression, TSDB_CFG_VTYPE_SHORT,
|
||||||
TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW,
|
TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW,
|
||||||
0, 2, 0, TSDB_CFG_UTYPE_NONE);
|
0, 2, 0, TSDB_CFG_UTYPE_NONE);
|
||||||
|
|
|
@ -92,7 +92,7 @@ void *taosAllocateQitem(int size) {
|
||||||
void taosFreeQitem(void *param) {
|
void taosFreeQitem(void *param) {
|
||||||
if (param == NULL) return;
|
if (param == NULL) return;
|
||||||
|
|
||||||
//pTrace("item:%p is freed", param);
|
pTrace("item:%p is freed", param);
|
||||||
|
|
||||||
char *temp = (char *)param;
|
char *temp = (char *)param;
|
||||||
temp -= sizeof(STaosQnode);
|
temp -= sizeof(STaosQnode);
|
||||||
|
@ -117,7 +117,7 @@ int taosWriteQitem(taos_queue param, int type, void *item) {
|
||||||
queue->numOfItems++;
|
queue->numOfItems++;
|
||||||
if (queue->qset) atomic_add_fetch_32(&queue->qset->numOfItems, 1);
|
if (queue->qset) atomic_add_fetch_32(&queue->qset->numOfItems, 1);
|
||||||
|
|
||||||
//pTrace("item:%p is put into queue, type:%d items:%d", item, type, queue->numOfItems);
|
pTrace("item:%p is put into queue, type:%d items:%d", item, type, queue->numOfItems);
|
||||||
|
|
||||||
pthread_mutex_unlock(&queue->mutex);
|
pthread_mutex_unlock(&queue->mutex);
|
||||||
|
|
||||||
|
@ -197,7 +197,7 @@ int taosGetQitem(taos_qall param, int *type, void **pitem) {
|
||||||
*pitem = pNode->item;
|
*pitem = pNode->item;
|
||||||
*type = pNode->type;
|
*type = pNode->type;
|
||||||
num = 1;
|
num = 1;
|
||||||
// pTrace("item:%p is fetched, type:%d", *pitem, *type);
|
pTrace("item:%p is fetched, type:%d", *pitem, *type);
|
||||||
}
|
}
|
||||||
|
|
||||||
return num;
|
return num;
|
||||||
|
|
|
@ -33,27 +33,22 @@ static void *tsDnodeVnodesHash;
|
||||||
static void vnodeCleanUp(SVnodeObj *pVnode);
|
static void vnodeCleanUp(SVnodeObj *pVnode);
|
||||||
static void vnodeBuildVloadMsg(char *pNode, void * param);
|
static void vnodeBuildVloadMsg(char *pNode, void * param);
|
||||||
|
|
||||||
int32_t vnodeInitModule() {
|
static int tsOpennedVnodes;
|
||||||
|
static pthread_once_t vnodeModuleInit = PTHREAD_ONCE_INIT;
|
||||||
|
|
||||||
|
static void vnodeInit() {
|
||||||
|
|
||||||
vnodeInitWriteFp();
|
vnodeInitWriteFp();
|
||||||
|
|
||||||
tsDnodeVnodesHash = taosInitIntHash(TSDB_MAX_VNODES, sizeof(SVnodeObj), taosHashInt);
|
tsDnodeVnodesHash = taosInitIntHash(TSDB_MAX_VNODES, sizeof(SVnodeObj), taosHashInt);
|
||||||
if (tsDnodeVnodesHash == NULL) {
|
if (tsDnodeVnodesHash == NULL) {
|
||||||
dError("failed to init vnode list");
|
dError("failed to init vnode list");
|
||||||
return -1;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
typedef void (*CleanupFp)(char *);
|
|
||||||
void vnodeCleanupModule() {
|
|
||||||
taosCleanUpIntHashWithFp(tsDnodeVnodesHash, (CleanupFp)vnodeClose);
|
|
||||||
taosCleanUpIntHash(tsDnodeVnodesHash);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) {
|
int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) {
|
||||||
int32_t code;
|
int32_t code;
|
||||||
|
pthread_once(&vnodeModuleInit, vnodeInit);
|
||||||
|
|
||||||
SVnodeObj *pTemp = (SVnodeObj *) taosGetIntHashData(tsDnodeVnodesHash, pVnodeCfg->cfg.vgId);
|
SVnodeObj *pTemp = (SVnodeObj *) taosGetIntHashData(tsDnodeVnodesHash, pVnodeCfg->cfg.vgId);
|
||||||
|
|
||||||
|
@ -93,7 +88,7 @@ int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
dPrint("vgId:%d, vnode is created", pVnodeCfg->cfg.vgId);
|
dPrint("vgId:%d, vnode is created, clog:%d", pVnodeCfg->cfg.vgId, pVnodeCfg->cfg.commitLog);
|
||||||
code = vnodeOpen(pVnodeCfg->cfg.vgId, rootDir);
|
code = vnodeOpen(pVnodeCfg->cfg.vgId, rootDir);
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
|
@ -116,6 +111,7 @@ int32_t vnodeDrop(int32_t vgId) {
|
||||||
|
|
||||||
int32_t vnodeOpen(int32_t vnode, char *rootDir) {
|
int32_t vnodeOpen(int32_t vnode, char *rootDir) {
|
||||||
char temp[TSDB_FILENAME_LEN];
|
char temp[TSDB_FILENAME_LEN];
|
||||||
|
pthread_once(&vnodeModuleInit, vnodeInit);
|
||||||
|
|
||||||
SVnodeObj vnodeObj = {0};
|
SVnodeObj vnodeObj = {0};
|
||||||
vnodeObj.vgId = vnode;
|
vnodeObj.vgId = vnode;
|
||||||
|
@ -147,11 +143,14 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
|
||||||
pVnode->status = VN_STATUS_READY;
|
pVnode->status = VN_STATUS_READY;
|
||||||
dTrace("pVnode:%p vgId:%d, vnode is opened in %s", pVnode, pVnode->vgId, rootDir);
|
dTrace("pVnode:%p vgId:%d, vnode is opened in %s", pVnode, pVnode->vgId, rootDir);
|
||||||
|
|
||||||
|
tsOpennedVnodes++;
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t vnodeClose(void *param) {
|
int32_t vnodeClose(int32_t vgId) {
|
||||||
SVnodeObj *pVnode = (SVnodeObj *)param;
|
|
||||||
|
SVnodeObj *pVnode = (SVnodeObj *) taosGetIntHashData(tsDnodeVnodesHash, vgId);
|
||||||
|
if (pVnode == NULL) return 0;
|
||||||
|
|
||||||
dTrace("pVnode:%p vgId:%d, vnode will be closed", pVnode, pVnode->vgId);
|
dTrace("pVnode:%p vgId:%d, vnode will be closed", pVnode, pVnode->vgId);
|
||||||
pVnode->status = VN_STATUS_CLOSING;
|
pVnode->status = VN_STATUS_CLOSING;
|
||||||
|
@ -165,7 +164,10 @@ void vnodeRelease(void *pVnodeRaw) {
|
||||||
|
|
||||||
int32_t refCount = atomic_sub_fetch_32(&pVnode->refCount, 1);
|
int32_t refCount = atomic_sub_fetch_32(&pVnode->refCount, 1);
|
||||||
|
|
||||||
if (refCount > 0) return;
|
if (refCount > 0) {
|
||||||
|
dTrace("pVnode:%p vgId:%d, release vnode, refCount:%d", pVnode, pVnode->vgId, refCount);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
// remove read queue
|
// remove read queue
|
||||||
dnodeFreeRqueue(pVnode->rqueue);
|
dnodeFreeRqueue(pVnode->rqueue);
|
||||||
|
@ -180,6 +182,13 @@ void vnodeRelease(void *pVnodeRaw) {
|
||||||
}
|
}
|
||||||
|
|
||||||
dTrace("pVnode:%p vgId:%d, vnode is released", pVnode, pVnode->vgId);
|
dTrace("pVnode:%p vgId:%d, vnode is released", pVnode, pVnode->vgId);
|
||||||
|
|
||||||
|
tsOpennedVnodes--;
|
||||||
|
if (tsOpennedVnodes <= 0) {
|
||||||
|
taosCleanUpIntHash(tsDnodeVnodesHash);
|
||||||
|
vnodeModuleInit = PTHREAD_ONCE_INIT;
|
||||||
|
tsDnodeVnodesHash = NULL;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void *vnodeGetVnode(int32_t vgId) {
|
void *vnodeGetVnode(int32_t vgId) {
|
||||||
|
@ -232,10 +241,7 @@ static void vnodeBuildVloadMsg(char *pNode, void * param) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static void vnodeCleanUp(SVnodeObj *pVnode) {
|
static void vnodeCleanUp(SVnodeObj *pVnode) {
|
||||||
if (pVnode->status == VN_STATUS_DELETING) {
|
|
||||||
// fix deadlock occured while close system
|
|
||||||
taosDeleteIntHash(tsDnodeVnodesHash, pVnode->vgId);
|
taosDeleteIntHash(tsDnodeVnodesHash, pVnode->vgId);
|
||||||
}
|
|
||||||
|
|
||||||
//syncStop(pVnode->sync);
|
//syncStop(pVnode->sync);
|
||||||
tsdbCloseRepo(pVnode->tsdb);
|
tsdbCloseRepo(pVnode->tsdb);
|
||||||
|
|
|
@ -255,7 +255,8 @@ int vnodeWriteToQueue(void *param, SWalHead *pHead, int type) {
|
||||||
SWalHead *pWal = (SWalHead *)taosAllocateQitem(size);
|
SWalHead *pWal = (SWalHead *)taosAllocateQitem(size);
|
||||||
memcpy(pWal, pHead, size);
|
memcpy(pWal, pHead, size);
|
||||||
|
|
||||||
taosWriteQitem(pVnode->wqueue, type, pHead);
|
atomic_add_fetch_32(&pVnode->refCount, 1);
|
||||||
|
taosWriteQitem(pVnode->wqueue, type, pWal);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -287,6 +287,8 @@ static int walRestoreWalFile(char *name, void *pVnode, int (*writeFp)(void *, SW
|
||||||
(*writeFp)(pVnode, pHead, TAOS_QTYPE_WAL);
|
(*writeFp)(pVnode, pHead, TAOS_QTYPE_WAL);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
free(buffer);
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue