Merge branch '3.0' into 3.0test/jcy

This commit is contained in:
jiacy-jcy 2022-07-05 09:00:18 +08:00
commit 58aaf2c9cb
46 changed files with 1171 additions and 311 deletions

View File

@ -708,6 +708,7 @@ typedef struct {
int32_t buffer; // MB
int32_t pageSize;
int32_t pages;
int32_t lastRowMem;
int32_t daysPerFile;
int32_t daysToKeep0;
int32_t daysToKeep1;
@ -736,6 +737,7 @@ typedef struct {
int32_t buffer;
int32_t pageSize;
int32_t pages;
int32_t lastRowMem;
int32_t daysPerFile;
int32_t daysToKeep0;
int32_t daysToKeep1;
@ -1023,8 +1025,10 @@ typedef struct {
int64_t clusterId;
int64_t rebootTime;
int64_t updateTime;
int32_t numOfCores;
float numOfCores;
int32_t numOfSupportVnodes;
int64_t memTotal;
int64_t memAvail;
char dnodeEp[TSDB_EP_LEN];
SMnodeLoad mload;
SQnodeLoad qload;
@ -1079,6 +1083,7 @@ typedef struct {
int32_t buffer;
int32_t pageSize;
int32_t pages;
int32_t lastRowMem;
int32_t daysPerFile;
int32_t daysToKeep0;
int32_t daysToKeep1;
@ -1131,6 +1136,7 @@ typedef struct {
int32_t buffer;
int32_t pageSize;
int32_t pages;
int32_t lastRowMem;
int32_t daysPerFile;
int32_t daysToKeep0;
int32_t daysToKeep1;

View File

@ -320,6 +320,7 @@ typedef struct SInterpFuncPhysiNode {
SNodeList* pFuncs;
STimeWindow timeRange;
int64_t interval;
int8_t intervalUnit;
EFillMode fillMode;
SNode* pFillValues; // SNodeListNode
SNode* pTimeSeries; // SColumnNode

View File

@ -130,7 +130,7 @@ void schedulerStopQueryHb(void *pTrans);
* Free the query job
* @param pJob
*/
void schedulerFreeJob(int64_t job, int32_t errCode);
void schedulerFreeJob(int64_t* job, int32_t errCode);
void schedulerDestroy(void);

View File

@ -110,12 +110,15 @@ typedef struct {
} SRpcCtx;
int32_t rpcInit();
void rpcCleanup();
void * rpcOpen(const SRpcInit *pRpc);
void rpcClose(void *);
void * rpcMallocCont(int32_t contLen);
void rpcFreeCont(void *pCont);
void * rpcReallocCont(void *ptr, int32_t contLen);
void rpcCleanup();
void *rpcOpen(const SRpcInit *pRpc);
void rpcClose(void *);
void rpcCloseImpl(void *);
void *rpcMallocCont(int32_t contLen);
void rpcFreeCont(void *pCont);
void *rpcReallocCont(void *ptr, int32_t contLen);
// Because taosd supports multi-process mode
// These functions should not be used on the server side

View File

@ -173,11 +173,12 @@ int32_t* taosGetErrno();
#define TSDB_CODE_MND_DNODE_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0341)
#define TSDB_CODE_MND_TOO_MANY_DNODES TAOS_DEF_ERROR_CODE(0, 0x0342)
#define TSDB_CODE_MND_NO_ENOUGH_DNODES TAOS_DEF_ERROR_CODE(0, 0x0343)
#define TSDB_CODE_MND_INVALID_CLUSTER_CFG TAOS_DEF_ERROR_CODE(0, 0x0344)
#define TSDB_CODE_MND_INVALID_CLUSTER_ID TAOS_DEF_ERROR_CODE(0, 0x0345)
#define TSDB_CODE_MND_INVALID_DNODE_CFG TAOS_DEF_ERROR_CODE(0, 0x0346)
#define TSDB_CODE_MND_INVALID_DNODE_EP TAOS_DEF_ERROR_CODE(0, 0x0347)
#define TSDB_CODE_MND_INVALID_DNODE_ID TAOS_DEF_ERROR_CODE(0, 0x0348)
#define TSDB_CODE_MND_NO_ENOUGH_MEM_IN_DNODE TAOS_DEF_ERROR_CODE(0, 0x0344)
#define TSDB_CODE_MND_INVALID_CLUSTER_CFG TAOS_DEF_ERROR_CODE(0, 0x0345)
#define TSDB_CODE_MND_INVALID_CLUSTER_ID TAOS_DEF_ERROR_CODE(0, 0x0346)
#define TSDB_CODE_MND_INVALID_DNODE_CFG TAOS_DEF_ERROR_CODE(0, 0x0347)
#define TSDB_CODE_MND_INVALID_DNODE_EP TAOS_DEF_ERROR_CODE(0, 0x0348)
#define TSDB_CODE_MND_INVALID_DNODE_ID TAOS_DEF_ERROR_CODE(0, 0x0349)
// mnode-node
#define TSDB_CODE_MND_MNODE_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0350)

View File

@ -334,6 +334,9 @@ typedef enum ELogicConditionType {
#define TSDB_MIN_DB_CACHE_LAST_ROW 0
#define TSDB_MAX_DB_CACHE_LAST_ROW 3
#define TSDB_DEFAULT_CACHE_LAST_ROW 0
#define TSDB_MIN_DB_LAST_ROW_MEM 1 // MB
#define TSDB_MAX_DB_LAST_ROW_MEM 65536
#define TSDB_DEFAULT_LAST_ROW_MEM 1
#define TSDB_DB_STREAM_MODE_OFF 0
#define TSDB_DB_STREAM_MODE_ON 1
#define TSDB_DEFAULT_DB_STREAM_MODE 0

View File

@ -65,7 +65,7 @@ enum {
typedef struct SAppInstInfo SAppInstInfo;
typedef struct {
char* key;
char* key;
// statistics
int32_t reportCnt;
int32_t connKeyCnt;
@ -177,14 +177,14 @@ typedef struct SReqResultInfo {
} SReqResultInfo;
typedef struct SRequestSendRecvBody {
tsem_t rspSem; // not used now
__taos_async_fn_t queryFp;
__taos_async_fn_t fetchFp;
void* param;
SDataBuf requestMsg;
int64_t queryJob; // query job, created according to sql query DAG.
int32_t subplanNum;
SReqResultInfo resInfo;
tsem_t rspSem; // not used now
__taos_async_fn_t queryFp;
__taos_async_fn_t fetchFp;
void* param;
SDataBuf requestMsg;
int64_t queryJob; // query job, created according to sql query DAG.
int32_t subplanNum;
SReqResultInfo resInfo;
} SRequestSendRecvBody;
typedef struct {
@ -284,6 +284,7 @@ static FORCE_INLINE SReqResultInfo* tscGetCurResInfo(TAOS_RES* res) {
extern SAppInfo appInfo;
extern int32_t clientReqRefPool;
extern int32_t clientConnRefPool;
extern void* tscQhandle;
__async_send_cb_fn_t getMsgRspHandle(int32_t msgType);
@ -301,7 +302,7 @@ void destroyRequest(SRequestObj* pRequest);
SRequestObj* acquireRequest(int64_t rid);
int32_t releaseRequest(int64_t rid);
int32_t removeRequest(int64_t rid);
void doDestroyRequest(void *p);
void doDestroyRequest(void* p);
char* getDbOfConnection(STscObj* pObj);
void setConnectionDB(STscObj* pTscObj, const char* db);
@ -336,8 +337,9 @@ int hbHandleRsp(SClientHbBatchRsp* hbRsp);
// cluster level
SAppHbMgr* appHbMgrInit(SAppInstInfo* pAppInstInfo, char* key);
void appHbMgrCleanup(void);
void hbRemoveAppHbMrg(SAppHbMgr **pAppHbMgr);
void closeAllRequests(SHashObj *pRequests);
void hbRemoveAppHbMrg(SAppHbMgr** pAppHbMgr);
void destroyAllRequests(SHashObj* pRequests);
void stopAllRequests(SHashObj* pRequests);
// conn level
int hbRegisterConn(SAppHbMgr* pAppHbMgr, int64_t tscRefId, int64_t clusterId, int8_t connType);
@ -356,6 +358,9 @@ int32_t removeMeta(STscObj* pTscObj, SArray* tbList); // todo move to clie
int32_t handleAlterTbExecRes(void* res, struct SCatalog* pCatalog); // todo move to xxx
bool qnodeRequired(SRequestObj* pRequest);
void initTscQhandle();
void cleanupTscQhandle();
#ifdef __cplusplus
}
#endif

View File

@ -25,6 +25,7 @@
#include "tmsg.h"
#include "tref.h"
#include "trpc.h"
#include "tsched.h"
#include "ttime.h"
#define TSC_VAR_NOT_RELEASE 1
@ -34,9 +35,20 @@ SAppInfo appInfo;
int32_t clientReqRefPool = -1;
int32_t clientConnRefPool = -1;
void *tscQhandle = NULL;
static TdThreadOnce tscinit = PTHREAD_ONCE_INIT;
volatile int32_t tscInitRes = 0;
void initTscQhandle() {
// init handle
tscQhandle = taosInitScheduler(4096, 5, "tsc");
}
void cleanupTscQhandle() {
// destroy handle
taosCleanUpScheduler(tscQhandle);
}
static int32_t registerRequest(SRequestObj *pRequest) {
STscObj *pTscObj = acquireTscObj(pRequest->pTscObj->id);
if (NULL == pTscObj) {
@ -121,12 +133,31 @@ void *openTransporter(const char *user, const char *auth, int32_t numOfThread) {
return pDnodeConn;
}
void closeAllRequests(SHashObj *pRequests) {
void destroyAllRequests(SHashObj *pRequests) {
void *pIter = taosHashIterate(pRequests, NULL);
while (pIter != NULL) {
int64_t *rid = pIter;
removeRequest(*rid);
SRequestObj *pRequest = acquireRequest(*rid);
if (pRequest) {
destroyRequest(pRequest);
releaseRequest(*rid);
}
pIter = taosHashIterate(pRequests, pIter);
}
}
void stopAllRequests(SHashObj *pRequests) {
void *pIter = taosHashIterate(pRequests, NULL);
while (pIter != NULL) {
int64_t *rid = pIter;
SRequestObj *pRequest = acquireRequest(*rid);
if (pRequest) {
taos_stop_query(pRequest);
releaseRequest(*rid);
}
pIter = taosHashIterate(pRequests, pIter);
}
@ -153,12 +184,18 @@ void destroyAppInst(SAppInstInfo *pAppInfo) {
}
void destroyTscObj(void *pObj) {
if (NULL == pObj) {
return;
}
STscObj *pTscObj = pObj;
int64_t tscId = pTscObj->id;
tscTrace("begin to destroy tscObj %" PRIx64 " p:%p", tscId, pTscObj);
SClientHbKey connKey = {.tscRid = pTscObj->id, .connType = pTscObj->connType};
hbDeregisterConn(pTscObj->pAppInfo->pAppHbMgr, connKey);
int64_t connNum = atomic_sub_fetch_64(&pTscObj->pAppInfo->numOfConns, 1);
closeAllRequests(pTscObj->pRequests);
destroyAllRequests(pTscObj->pRequests);
schedulerStopQueryHb(pTscObj->pAppInfo->pTransporter);
tscDebug("connObj 0x%" PRIx64 " p:%p destroyed, remain inst totalConn:%" PRId64, pTscObj->id, pTscObj,
pTscObj->pAppInfo->numOfConns);
@ -167,7 +204,9 @@ void destroyTscObj(void *pObj) {
destroyAppInst(pTscObj->pAppInfo);
}
taosThreadMutexDestroy(&pTscObj->mutex);
taosMemoryFreeClear(pTscObj);
taosMemoryFree(pTscObj);
tscTrace("end to destroy tscObj %" PRIx64 " p:%p", tscId, pTscObj);
}
void *createTscObj(const char *user, const char *auth, const char *db, int32_t connType, SAppInstInfo *pAppInfo) {
@ -261,14 +300,18 @@ int32_t releaseRequest(int64_t rid) { return taosReleaseRef(clientReqRefPool, ri
int32_t removeRequest(int64_t rid) { return taosRemoveRef(clientReqRefPool, rid); }
void doDestroyRequest(void *p) {
assert(p != NULL);
if (NULL == p) {
return;
}
SRequestObj *pRequest = (SRequestObj *)p;
int64_t reqId = pRequest->self;
tscTrace("begin to destroy request %" PRIx64 " p:%p", reqId, pRequest);
taosHashRemove(pRequest->pTscObj->pRequests, &pRequest->self, sizeof(pRequest->self));
if (pRequest->body.queryJob != 0) {
schedulerFreeJob(pRequest->body.queryJob, 0);
}
schedulerFreeJob(&pRequest->body.queryJob, 0);
taosMemoryFreeClear(pRequest->msgBuf);
taosMemoryFreeClear(pRequest->sqlstr);
@ -284,7 +327,9 @@ void doDestroyRequest(void *p) {
if (pRequest->self) {
deregisterRequest(pRequest);
}
taosMemoryFreeClear(pRequest);
taosMemoryFree(pRequest);
tscTrace("end to destroy request %" PRIx64 " p:%p", reqId, pRequest);
}
void destroyRequest(SRequestObj *pRequest) {
@ -292,6 +337,8 @@ void destroyRequest(SRequestObj *pRequest) {
return;
}
taos_stop_query(pRequest);
removeRequest(pRequest->self);
}
@ -299,7 +346,7 @@ void taos_init_imp(void) {
// In the APIs of other program language, taos_cleanup is not available yet.
// So, to make sure taos_cleanup will be invoked to clean up the allocated resource to suppress the valgrind warning.
atexit(taos_cleanup);
initTscQhandle();
errno = TSDB_CODE_SUCCESS;
taosSeedRand(taosGetTimestampSec());

View File

@ -25,6 +25,7 @@
#include "tmsgtype.h"
#include "tpagedbuf.h"
#include "tref.h"
#include "tsched.h"
static int32_t initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet);
static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest);
@ -56,14 +57,14 @@ static char* getClusterKey(const char* user, const char* auth, const char* ip, i
}
bool chkRequestKilled(void* param) {
bool killed = false;
bool killed = false;
SRequestObj* pRequest = acquireRequest((int64_t)param);
if (NULL == pRequest || pRequest->killed) {
killed = true;
}
releaseRequest((int64_t)param);
return killed;
}
@ -645,9 +646,7 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList
pRequest->body.resInfo.execRes = res.res;
if (code != TSDB_CODE_SUCCESS) {
if (pRequest->body.queryJob != 0) {
schedulerFreeJob(pRequest->body.queryJob, 0);
}
schedulerFreeJob(&pRequest->body.queryJob, 0);
pRequest->code = code;
terrno = code;
@ -658,9 +657,7 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList
TDMT_VND_CREATE_TABLE == pRequest->type) {
pRequest->body.resInfo.numOfRows = res.numOfRows;
if (pRequest->body.queryJob != 0) {
schedulerFreeJob(pRequest->body.queryJob, 0);
}
schedulerFreeJob(&pRequest->body.queryJob, 0);
}
pRequest->code = res.code;
@ -769,7 +766,7 @@ int32_t handleQueryExecRsp(SRequestObj* pRequest) {
code = handleSubmitExecRes(pRequest, pRes->res, pCatalog, &epset);
break;
}
case TDMT_SCH_QUERY:
case TDMT_SCH_QUERY:
case TDMT_SCH_MERGE_QUERY: {
code = handleQueryExecRes(pRequest, pRes->res, pCatalog, &epset);
break;
@ -792,10 +789,7 @@ void schedulerExecCb(SQueryResult* pResult, void* param, int32_t code) {
TDMT_VND_CREATE_TABLE == pRequest->type) {
pRequest->body.resInfo.numOfRows = pResult->numOfRows;
if (pRequest->body.queryJob != 0) {
schedulerFreeJob(pRequest->body.queryJob, 0);
pRequest->body.queryJob = 0;
}
schedulerFreeJob(&pRequest->body.queryJob, 0);
}
taosMemoryFree(pResult);
@ -1239,7 +1233,16 @@ void updateTargetEpSet(SMsgSendInfo* pSendInfo, STscObj* pTscObj, SRpcMsg* pMsg,
}
}
void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
typedef struct SchedArg {
SRpcMsg msg;
SEpSet* pEpset;
} SchedArg;
void doProcessMsgFromServer(SSchedMsg* schedMsg) {
SchedArg* arg = (SchedArg*)schedMsg->ahandle;
SRpcMsg* pMsg = &arg->msg;
SEpSet* pEpSet = arg->pEpset;
SMsgSendInfo* pSendInfo = (SMsgSendInfo*)pMsg->info.ahandle;
assert(pMsg->info.ahandle != NULL);
STscObj* pTscObj = NULL;
@ -1272,7 +1275,8 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
updateTargetEpSet(pSendInfo, pTscObj, pMsg, pEpSet);
SDataBuf buf = {.msgType = pMsg->msgType, .len = pMsg->contLen, .pData = NULL, .handle = pMsg->info.handle, .pEpSet = pEpSet};
SDataBuf buf = {
.msgType = pMsg->msgType, .len = pMsg->contLen, .pData = NULL, .handle = pMsg->info.handle, .pEpSet = pEpSet};
if (pMsg->contLen > 0) {
buf.pData = taosMemoryCalloc(1, pMsg->contLen);
@ -1287,6 +1291,25 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
pSendInfo->fp(pSendInfo->param, &buf, pMsg->code);
rpcFreeCont(pMsg->pCont);
destroySendMsgInfo(pSendInfo);
taosMemoryFree(arg);
}
void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
SSchedMsg schedMsg = {0};
SEpSet* tEpSet = pEpSet != NULL ? taosMemoryCalloc(1, sizeof(SEpSet)) : NULL;
if (tEpSet != NULL) {
*tEpSet = *pEpSet;
}
SchedArg* arg = taosMemoryCalloc(1, sizeof(SchedArg));
arg->msg = *pMsg;
arg->pEpset = tEpSet;
schedMsg.fp = doProcessMsgFromServer;
schedMsg.ahandle = arg;
taosScheduleTask(tscQhandle, &schedMsg);
}
TAOS* taos_connect_auth(const char* ip, const char* user, const char* auth, const char* db, uint16_t port) {
@ -1415,7 +1438,7 @@ void* doAsyncFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertU
pParam = taosMemoryCalloc(1, sizeof(SSyncQueryParam));
tsem_init(&pParam->sem, 0, 0);
}
// convert ucs4 to native multi-bytes string
pResultInfo->convertUcs4 = convertUcs4;

View File

@ -47,11 +47,9 @@ int taos_options(TSDB_OPTION option, const void *arg, ...) {
atomic_store_32(&lock, 0);
return ret;
}
// this function may be called by user or system, or by both simultaneously.
void taos_cleanup(void) {
tscInfo("start to cleanup client environment");
tscInfo("start to cleanup client environment");
if (atomic_val_compare_exchange_32(&sentinel, TSC_VAR_NOT_RELEASE, TSC_VAR_RELEASED) != TSC_VAR_NOT_RELEASE) {
return;
}
@ -74,8 +72,8 @@ void taos_cleanup(void) {
catalogDestroy();
schedulerDestroy();
cleanupTscQhandle();
rpcCleanup();
tscInfo("all local resources released");
taosCleanupCfg();
taosCloseLog();
@ -108,7 +106,7 @@ TAOS *taos_connect(const char *ip, const char *user, const char *pass, const cha
if (pObj) {
int64_t *rid = taosMemoryCalloc(1, sizeof(int64_t));
*rid = pObj->id;
return (TAOS*)rid;
return (TAOS *)rid;
}
return NULL;
@ -196,10 +194,10 @@ void taos_kill_query(TAOS *taos) {
if (NULL == taos) {
return;
}
int64_t rid = *(int64_t*)taos;
STscObj* pTscObj = acquireTscObj(rid);
closeAllRequests(pTscObj->pRequests);
int64_t rid = *(int64_t *)taos;
STscObj *pTscObj = acquireTscObj(rid);
stopAllRequests(pTscObj->pRequests);
releaseTscObj(rid);
}
@ -244,7 +242,7 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
#endif
} else if (TD_RES_TMQ(res)) {
SMqRspObj *msg = ((SMqRspObj *)res);
SMqRspObj * msg = ((SMqRspObj *)res);
SReqResultInfo *pResultInfo;
if (msg->resIter == -1) {
pResultInfo = tmqGetNextResInfo(res, true);
@ -420,7 +418,7 @@ int taos_affected_rows(TAOS_RES *res) {
return 0;
}
SRequestObj *pRequest = (SRequestObj *)res;
SRequestObj * pRequest = (SRequestObj *)res;
SReqResultInfo *pResInfo = &pRequest->body.resInfo;
return pResInfo->numOfRows;
}
@ -480,9 +478,7 @@ void taos_stop_query(TAOS_RES *res) {
return;
}
if (pRequest->body.queryJob) {
schedulerFreeJob(pRequest->body.queryJob, TSDB_CODE_TSC_QUERY_KILLED);
}
schedulerFreeJob(&pRequest->body.queryJob, TSDB_CODE_TSC_QUERY_KILLED);
tscDebug("request %" PRIx64 " killed", pRequest->requestId);
}
@ -606,7 +602,7 @@ int *taos_get_column_data_offset(TAOS_RES *res, int columnIndex) {
}
SReqResultInfo *pResInfo = tscGetCurResInfo(res);
TAOS_FIELD *pField = &pResInfo->userFields[columnIndex];
TAOS_FIELD * pField = &pResInfo->userFields[columnIndex];
if (!IS_VAR_DATA_TYPE(pField->type)) {
return 0;
}
@ -650,8 +646,8 @@ const char *taos_get_server_info(TAOS *taos) {
typedef struct SqlParseWrapper {
SParseContext *pCtx;
SCatalogReq catalogReq;
SRequestObj *pRequest;
SQuery *pQuery;
SRequestObj * pRequest;
SQuery * pQuery;
} SqlParseWrapper;
static void destorySqlParseWrapper(SqlParseWrapper *pWrapper) {
@ -672,8 +668,8 @@ void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) {
tscDebug("enter meta callback, code %s", tstrerror(code));
SqlParseWrapper *pWrapper = (SqlParseWrapper *)param;
SQuery *pQuery = pWrapper->pQuery;
SRequestObj *pRequest = pWrapper->pRequest;
SQuery * pQuery = pWrapper->pQuery;
SRequestObj * pRequest = pWrapper->pRequest;
if (code == TSDB_CODE_SUCCESS) {
code = qAnalyseSqlSemantic(pWrapper->pCtx, &pWrapper->catalogReq, pResultMeta, pQuery);
@ -722,31 +718,29 @@ int32_t createParseContext(const SRequestObj *pRequest, SParseContext **pCxt) {
return TSDB_CODE_OUT_OF_MEMORY;
}
**pCxt = (SParseContext){
.requestId = pRequest->requestId,
.requestRid = pRequest->self,
.acctId = pTscObj->acctId,
.db = pRequest->pDb,
.topicQuery = false,
.pSql = pRequest->sqlstr,
.sqlLen = pRequest->sqlLen,
.pMsg = pRequest->msgBuf,
.msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
.pTransporter = pTscObj->pAppInfo->pTransporter,
.pStmtCb = NULL,
.pUser = pTscObj->user,
.schemalessType = pTscObj->schemalessType,
.isSuperUser = (0 == strcmp(pTscObj->user, TSDB_DEFAULT_USER)),
.async = true,
.svrVer = pTscObj->sVer,
.nodeOffline = (pTscObj->pAppInfo->onlineDnodes < pTscObj->pAppInfo->totalDnodes)
};
**pCxt = (SParseContext){.requestId = pRequest->requestId,
.requestRid = pRequest->self,
.acctId = pTscObj->acctId,
.db = pRequest->pDb,
.topicQuery = false,
.pSql = pRequest->sqlstr,
.sqlLen = pRequest->sqlLen,
.pMsg = pRequest->msgBuf,
.msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
.pTransporter = pTscObj->pAppInfo->pTransporter,
.pStmtCb = NULL,
.pUser = pTscObj->user,
.schemalessType = pTscObj->schemalessType,
.isSuperUser = (0 == strcmp(pTscObj->user, TSDB_DEFAULT_USER)),
.async = true,
.svrVer = pTscObj->sVer,
.nodeOffline = (pTscObj->pAppInfo->onlineDnodes < pTscObj->pAppInfo->totalDnodes)};
return TSDB_CODE_SUCCESS;
}
void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) {
SParseContext *pCxt = NULL;
STscObj *pTscObj = pRequest->pTscObj;
STscObj * pTscObj = pRequest->pTscObj;
int32_t code = 0;
if (pRequest->retry++ > REQUEST_TOTAL_EXEC_TIMES) {
@ -911,10 +905,10 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) {
return terrno;
}
int64_t rid = *(int64_t*)taos;
int64_t rid = *(int64_t *)taos;
const int32_t MAX_TABLE_NAME_LENGTH = 12 * 1024 * 1024; // 12MB list
int32_t code = 0;
SRequestObj *pRequest = NULL;
SRequestObj * pRequest = NULL;
SCatalogReq catalogReq = {0};
if (NULL == tableNameList) {

View File

@ -947,8 +947,10 @@ int32_t tSerializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) {
if (tEncodeI64(&encoder, pReq->clusterId) < 0) return -1;
if (tEncodeI64(&encoder, pReq->rebootTime) < 0) return -1;
if (tEncodeI64(&encoder, pReq->updateTime) < 0) return -1;
if (tEncodeI32(&encoder, pReq->numOfCores) < 0) return -1;
if (tEncodeFloat(&encoder, pReq->numOfCores) < 0) return -1;
if (tEncodeI32(&encoder, pReq->numOfSupportVnodes) < 0) return -1;
if (tEncodeI64(&encoder, pReq->memTotal) < 0) return -1;
if (tEncodeI64(&encoder, pReq->memAvail) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->dnodeEp) < 0) return -1;
// cluster cfg
@ -1008,8 +1010,10 @@ int32_t tDeserializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) {
if (tDecodeI64(&decoder, &pReq->clusterId) < 0) return -1;
if (tDecodeI64(&decoder, &pReq->rebootTime) < 0) return -1;
if (tDecodeI64(&decoder, &pReq->updateTime) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->numOfCores) < 0) return -1;
if (tDecodeFloat(&decoder, &pReq->numOfCores) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->numOfSupportVnodes) < 0) return -1;
if (tDecodeI64(&decoder, &pReq->memTotal) < 0) return -1;
if (tDecodeI64(&decoder, &pReq->memAvail) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->dnodeEp) < 0) return -1;
// cluster cfg
@ -1974,6 +1978,7 @@ int32_t tSerializeSCreateDbReq(void *buf, int32_t bufLen, SCreateDbReq *pReq) {
if (tEncodeI32(&encoder, pReq->buffer) < 0) return -1;
if (tEncodeI32(&encoder, pReq->pageSize) < 0) return -1;
if (tEncodeI32(&encoder, pReq->pages) < 0) return -1;
if (tEncodeI32(&encoder, pReq->lastRowMem) < 0) return -1;
if (tEncodeI32(&encoder, pReq->daysPerFile) < 0) return -1;
if (tEncodeI32(&encoder, pReq->daysToKeep0) < 0) return -1;
if (tEncodeI32(&encoder, pReq->daysToKeep1) < 0) return -1;
@ -2015,6 +2020,7 @@ int32_t tDeserializeSCreateDbReq(void *buf, int32_t bufLen, SCreateDbReq *pReq)
if (tDecodeI32(&decoder, &pReq->buffer) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->pageSize) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->pages) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->lastRowMem) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->daysPerFile) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->daysToKeep0) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->daysToKeep1) < 0) return -1;
@ -2069,6 +2075,7 @@ int32_t tSerializeSAlterDbReq(void *buf, int32_t bufLen, SAlterDbReq *pReq) {
if (tEncodeI32(&encoder, pReq->buffer) < 0) return -1;
if (tEncodeI32(&encoder, pReq->pageSize) < 0) return -1;
if (tEncodeI32(&encoder, pReq->pages) < 0) return -1;
if (tEncodeI32(&encoder, pReq->lastRowMem) < 0) return -1;
if (tEncodeI32(&encoder, pReq->daysPerFile) < 0) return -1;
if (tEncodeI32(&encoder, pReq->daysToKeep0) < 0) return -1;
if (tEncodeI32(&encoder, pReq->daysToKeep1) < 0) return -1;
@ -2094,6 +2101,7 @@ int32_t tDeserializeSAlterDbReq(void *buf, int32_t bufLen, SAlterDbReq *pReq) {
if (tDecodeI32(&decoder, &pReq->buffer) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->pageSize) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->pages) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->lastRowMem) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->daysPerFile) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->daysToKeep0) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->daysToKeep1) < 0) return -1;
@ -3586,6 +3594,7 @@ int32_t tSerializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *pR
if (tEncodeI32(&encoder, pReq->buffer) < 0) return -1;
if (tEncodeI32(&encoder, pReq->pageSize) < 0) return -1;
if (tEncodeI32(&encoder, pReq->pages) < 0) return -1;
if (tEncodeI32(&encoder, pReq->lastRowMem) < 0) return -1;
if (tEncodeI32(&encoder, pReq->daysPerFile) < 0) return -1;
if (tEncodeI32(&encoder, pReq->daysToKeep0) < 0) return -1;
if (tEncodeI32(&encoder, pReq->daysToKeep1) < 0) return -1;
@ -3643,6 +3652,7 @@ int32_t tDeserializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *
if (tDecodeI32(&decoder, &pReq->buffer) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->pageSize) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->pages) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->lastRowMem) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->daysPerFile) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->daysToKeep0) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->daysToKeep1) < 0) return -1;
@ -3665,7 +3675,6 @@ int32_t tDeserializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *
SReplica *pReplica = &pReq->replicas[i];
if (tDecodeSReplica(&decoder, pReplica) < 0) return -1;
}
if (tDecodeI32(&decoder, &pReq->numOfRetensions) < 0) return -1;
pReq->pRetensions = taosArrayInit(pReq->numOfRetensions, sizeof(SRetention));
if (pReq->pRetensions == NULL) {
@ -3768,6 +3777,7 @@ int32_t tSerializeSAlterVnodeReq(void *buf, int32_t bufLen, SAlterVnodeReq *pReq
if (tEncodeI32(&encoder, pReq->buffer) < 0) return -1;
if (tEncodeI32(&encoder, pReq->pageSize) < 0) return -1;
if (tEncodeI32(&encoder, pReq->pages) < 0) return -1;
if (tEncodeI32(&encoder, pReq->lastRowMem) < 0) return -1;
if (tEncodeI32(&encoder, pReq->daysPerFile) < 0) return -1;
if (tEncodeI32(&encoder, pReq->daysToKeep0) < 0) return -1;
if (tEncodeI32(&encoder, pReq->daysToKeep1) < 0) return -1;
@ -3782,7 +3792,6 @@ int32_t tSerializeSAlterVnodeReq(void *buf, int32_t bufLen, SAlterVnodeReq *pReq
SReplica *pReplica = &pReq->replicas[i];
if (tEncodeSReplica(&encoder, pReplica) < 0) return -1;
}
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
@ -3799,6 +3808,7 @@ int32_t tDeserializeSAlterVnodeReq(void *buf, int32_t bufLen, SAlterVnodeReq *pR
if (tDecodeI32(&decoder, &pReq->buffer) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->pageSize) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->pages) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->lastRowMem) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->daysPerFile) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->daysToKeep0) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->daysToKeep1) < 0) return -1;

View File

@ -67,6 +67,8 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) {
req.updateTime = pMgmt->pData->updateTime;
req.numOfCores = tsNumOfCores;
req.numOfSupportVnodes = tsNumOfSupportVnodes;
req.memTotal = tsTotalMemoryKB * 1024;
req.memAvail = req.memTotal - tsRpcQueueMemoryAllowed - 16 * 1024 * 1024;
tstrncpy(req.dnodeEp, tsLocalEp, TSDB_EP_LEN);
req.clusterCfg.statusInterval = tsStatusInterval;

View File

@ -280,7 +280,7 @@ static void dmPrintEps(SDnodeData *pData) {
dDebug("print dnode list, num:%d", numOfEps);
for (int32_t i = 0; i < numOfEps; i++) {
SDnodeEp *pEp = taosArrayGet(pData->dnodeEps, i);
dDebug("dnode:%d, fqdn:%s port:%u is_mnode:%d", pEp->id, pEp->ep.fqdn, pEp->ep.port, pEp->isMnode);
dDebug("dnode:%d, fqdn:%s port:%u isMnode:%d", pEp->id, pEp->ep.fqdn, pEp->ep.port, pEp->isMnode);
}
}

View File

@ -30,12 +30,14 @@ void Testbase::InitLog(const char* path) {
tsdbDebugFlag = 0;
tsLogEmbedded = 1;
tsAsyncLog = 0;
tsRpcQueueMemoryAllowed = 1024 * 1024 * 64;
taosRemoveDir(path);
taosMkDir(path);
tstrncpy(tsLogDir, path, PATH_MAX);
if (taosInitLog("taosdlog", 1) != 0) {
taosGetSystemInfo();
tsRpcQueueMemoryAllowed = tsTotalMemoryKB * 0.1;
if (taosInitLog("taosdlog", 1) != 0) {
printf("failed to init log file\n");
}
}

View File

@ -148,7 +148,10 @@ typedef struct {
int32_t accessTimes;
int32_t numOfVnodes;
int32_t numOfSupportVnodes;
int32_t numOfCores;
float numOfCores;
int64_t memTotal;
int64_t memAvail;
int64_t memUsed;
EDndReason offlineReason;
uint16_t port;
char fqdn[TSDB_FQDN_LEN];
@ -243,6 +246,7 @@ typedef struct {
int32_t buffer;
int32_t pageSize;
int32_t pages;
int32_t lastRowMem;
int32_t daysPerFile;
int32_t daysToKeep0;
int32_t daysToKeep1;
@ -255,8 +259,8 @@ typedef struct {
int8_t compression;
int8_t replications;
int8_t strict;
int8_t cacheLastRow;
int8_t hashMethod; // default is 1
int8_t cacheLastRow;
int32_t numOfRetensions;
SArray* pRetensions;
int8_t schemaless;

View File

@ -30,6 +30,8 @@ SSdbRaw *mndVgroupActionEncode(SVgObj *pVgroup);
SEpSet mndGetVgroupEpset(SMnode *pMnode, const SVgObj *pVgroup);
int32_t mndGetVnodesNum(SMnode *pMnode, int32_t dnodeId);
void mndSortVnodeGid(SVgObj *pVgroup);
int64_t mndGetVnodesMemory(SMnode *pMnode, int32_t dnodeId);
int64_t mndGetVgroupMemory(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup);
SArray *mndBuildDnodesArray(SMnode *, int32_t exceptDnodeId);
int32_t mndAllocSmaVgroup(SMnode *, SDbObj *pDb, SVgObj *pVgroup);

View File

@ -93,6 +93,7 @@ static SSdbRaw *mndDbActionEncode(SDbObj *pDb) {
SDB_SET_INT32(pRaw, dataPos, pDb->cfg.buffer, _OVER)
SDB_SET_INT32(pRaw, dataPos, pDb->cfg.pageSize, _OVER)
SDB_SET_INT32(pRaw, dataPos, pDb->cfg.pages, _OVER)
SDB_SET_INT32(pRaw, dataPos, pDb->cfg.lastRowMem, _OVER)
SDB_SET_INT32(pRaw, dataPos, pDb->cfg.daysPerFile, _OVER)
SDB_SET_INT32(pRaw, dataPos, pDb->cfg.daysToKeep0, _OVER)
SDB_SET_INT32(pRaw, dataPos, pDb->cfg.daysToKeep1, _OVER)
@ -165,6 +166,7 @@ static SSdbRow *mndDbActionDecode(SSdbRaw *pRaw) {
SDB_GET_INT32(pRaw, dataPos, &pDb->cfg.buffer, _OVER)
SDB_GET_INT32(pRaw, dataPos, &pDb->cfg.pageSize, _OVER)
SDB_GET_INT32(pRaw, dataPos, &pDb->cfg.pages, _OVER)
SDB_GET_INT32(pRaw, dataPos, &pDb->cfg.lastRowMem, _OVER)
SDB_GET_INT32(pRaw, dataPos, &pDb->cfg.daysPerFile, _OVER)
SDB_GET_INT32(pRaw, dataPos, &pDb->cfg.daysToKeep0, _OVER)
SDB_GET_INT32(pRaw, dataPos, &pDb->cfg.daysToKeep1, _OVER)
@ -230,8 +232,9 @@ static int32_t mndDbActionUpdate(SSdb *pSdb, SDbObj *pOld, SDbObj *pNew) {
pOld->cfgVersion = pNew->cfgVersion;
pOld->vgVersion = pNew->vgVersion;
pOld->cfg.buffer = pNew->cfg.buffer;
pOld->cfg.pages = pNew->cfg.pages;
pOld->cfg.pageSize = pNew->cfg.pageSize;
pOld->cfg.pages = pNew->cfg.pages;
pOld->cfg.lastRowMem = pNew->cfg.lastRowMem;
pOld->cfg.daysPerFile = pNew->cfg.daysPerFile;
pOld->cfg.daysToKeep0 = pNew->cfg.daysToKeep0;
pOld->cfg.daysToKeep1 = pNew->cfg.daysToKeep1;
@ -288,6 +291,7 @@ static int32_t mndCheckDbCfg(SMnode *pMnode, SDbCfg *pCfg) {
if (pCfg->buffer < TSDB_MIN_BUFFER_PER_VNODE || pCfg->buffer > TSDB_MAX_BUFFER_PER_VNODE) return -1;
if (pCfg->pageSize < TSDB_MIN_PAGESIZE_PER_VNODE || pCfg->pageSize > TSDB_MAX_PAGESIZE_PER_VNODE) return -1;
if (pCfg->pages < TSDB_MIN_PAGES_PER_VNODE || pCfg->pages > TSDB_MAX_PAGES_PER_VNODE) return -1;
if (pCfg->lastRowMem < TSDB_MIN_DB_LAST_ROW_MEM || pCfg->lastRowMem > TSDB_MAX_DB_LAST_ROW_MEM) return -1;
if (pCfg->daysPerFile < TSDB_MIN_DAYS_PER_FILE || pCfg->daysPerFile > TSDB_MAX_DAYS_PER_FILE) return -1;
if (pCfg->daysToKeep0 < TSDB_MIN_KEEP || pCfg->daysToKeep0 > TSDB_MAX_KEEP) return -1;
if (pCfg->daysToKeep1 < TSDB_MIN_KEEP || pCfg->daysToKeep1 > TSDB_MAX_KEEP) return -1;
@ -336,6 +340,7 @@ static void mndSetDefaultDbCfg(SDbCfg *pCfg) {
if (pCfg->replications < 0) pCfg->replications = TSDB_DEFAULT_DB_REPLICA;
if (pCfg->strict < 0) pCfg->strict = TSDB_DEFAULT_DB_STRICT;
if (pCfg->cacheLastRow < 0) pCfg->cacheLastRow = TSDB_DEFAULT_CACHE_LAST_ROW;
if (pCfg->lastRowMem <= 0) pCfg->lastRowMem = TSDB_DEFAULT_LAST_ROW_MEM;
if (pCfg->numOfRetensions < 0) pCfg->numOfRetensions = 0;
if (pCfg->schemaless < 0) pCfg->schemaless = TSDB_DB_SCHEMALESS_OFF;
}
@ -434,6 +439,7 @@ static int32_t mndCreateDb(SMnode *pMnode, SRpcMsg *pReq, SCreateDbReq *pCreate,
.buffer = pCreate->buffer,
.pageSize = pCreate->pageSize,
.pages = pCreate->pages,
.lastRowMem = pCreate->lastRowMem,
.daysPerFile = pCreate->daysPerFile,
.daysToKeep0 = pCreate->daysToKeep0,
.daysToKeep1 = pCreate->daysToKeep1,
@ -475,7 +481,7 @@ static int32_t mndCreateDb(SMnode *pMnode, SRpcMsg *pReq, SCreateDbReq *pCreate,
int32_t code = -1;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pReq);
if (pTrans == NULL) goto _OVER;
// mndTransSetSerial(pTrans);
mDebug("trans:%d, used to create db:%s", pTrans->id, pCreate->db);
mndTransSetDbName(pTrans, dbObj.name, NULL);
@ -622,6 +628,11 @@ static int32_t mndSetDbCfgFromAlterDbReq(SDbObj *pDb, SAlterDbReq *pAlter) {
terrno = 0;
}
if (pAlter->lastRowMem > 0 && pAlter->lastRowMem != pDb->cfg.lastRowMem) {
pDb->cfg.lastRowMem = pAlter->lastRowMem;
terrno = 0;
}
if (pAlter->replications > 0 && pAlter->replications != pDb->cfg.replications) {
#if 1
terrno = TSDB_CODE_OPS_NOT_SUPPORT;

View File

@ -15,8 +15,8 @@
#define _DEFAULT_SOURCE
#include "mndDnode.h"
#include "mndPrivilege.h"
#include "mndMnode.h"
#include "mndPrivilege.h"
#include "mndQnode.h"
#include "mndShow.h"
#include "mndSnode.h"
@ -432,7 +432,8 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
}
if (!online) {
mInfo("dnode:%d, from offline to online", pDnode->id);
mInfo("dnode:%d, from offline to online, memory avail:%" PRId64 " total:%" PRId64 " cores:%.2f", pDnode->id,
statusReq.memAvail, statusReq.memTotal, statusReq.numOfCores);
} else {
mDebug("dnode:%d, send dnode epset, online:%d dnodeVer:%" PRId64 ":%" PRId64 " reboot:%d", pDnode->id, online,
statusReq.dnodeVer, dnodeVer, reboot);
@ -441,6 +442,8 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
pDnode->rebootTime = statusReq.rebootTime;
pDnode->numOfCores = statusReq.numOfCores;
pDnode->numOfSupportVnodes = statusReq.numOfSupportVnodes;
pDnode->memAvail = statusReq.memAvail;
pDnode->memTotal = statusReq.memTotal;
SStatusRsp statusRsp = {0};
statusRsp.dnodeVer = dnodeVer;
@ -580,7 +583,7 @@ static int32_t mndProcessShowVariablesReq(SRpcMsg *pReq) {
strcpy(info.name, "timezone");
snprintf(info.value, TSDB_CONFIG_VALUE_LEN, "%s", tsTimezoneStr);
taosArrayPush(rsp.variables, &info);
strcpy(info.name, "locale");
snprintf(info.value, TSDB_CONFIG_VALUE_LEN, "%s", tsLocale);
taosArrayPush(rsp.variables, &info);

View File

@ -207,6 +207,7 @@ void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVg
createReq.buffer = pDb->cfg.buffer;
createReq.pageSize = pDb->cfg.pageSize;
createReq.pages = pDb->cfg.pages;
createReq.lastRowMem = pDb->cfg.lastRowMem;
createReq.daysPerFile = pDb->cfg.daysPerFile;
createReq.daysToKeep0 = pDb->cfg.daysToKeep0;
createReq.daysToKeep1 = pDb->cfg.daysToKeep1;
@ -274,8 +275,9 @@ void *mndBuildAlterVnodeReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, int32_
SAlterVnodeReq alterReq = {0};
alterReq.vgVersion = pVgroup->version;
alterReq.buffer = pDb->cfg.buffer;
alterReq.pages = pDb->cfg.pages;
alterReq.pageSize = pDb->cfg.pageSize;
alterReq.pages = pDb->cfg.pages;
alterReq.lastRowMem = pDb->cfg.lastRowMem;
alterReq.daysPerFile = pDb->cfg.daysPerFile;
alterReq.daysToKeep0 = pDb->cfg.daysToKeep0;
alterReq.daysToKeep1 = pDb->cfg.daysToKeep1;
@ -392,9 +394,10 @@ static bool mndBuildDnodesArrayFp(SMnode *pMnode, void *pObj, void *p1, void *p2
bool online = mndIsDnodeOnline(pDnode, curMs);
bool isMnode = mndIsMnode(pMnode, pDnode->id);
pDnode->numOfVnodes = mndGetVnodesNum(pMnode, pDnode->id);
pDnode->memUsed = mndGetVnodesMemory(pMnode, pDnode->id);
mDebug("dnode:%d, vnodes:%d support_vnodes:%d is_mnode:%d online:%d", pDnode->id, pDnode->numOfVnodes,
pDnode->numOfSupportVnodes, isMnode, online);
mDebug("dnode:%d, vnodes:%d supportVnodes:%d isMnode:%d online:%d memory avail:%" PRId64 " used:%" PRId64, pDnode->id,
pDnode->numOfVnodes, pDnode->numOfSupportVnodes, isMnode, online, pDnode->memAvail, pDnode->memUsed);
if (isMnode) {
pDnode->numOfVnodes++;
@ -426,15 +429,7 @@ static int32_t mndCompareDnodeId(int32_t *dnode1Id, int32_t *dnode2Id) { return
static int32_t mndCompareDnodeVnodes(SDnodeObj *pDnode1, SDnodeObj *pDnode2) {
float d1Score = (float)pDnode1->numOfVnodes / pDnode1->numOfSupportVnodes;
float d2Score = (float)pDnode2->numOfVnodes / pDnode2->numOfSupportVnodes;
#if 0
if (d1Score == d2Score) {
return pDnode2->id - pDnode1->id;
} else {
return d1Score >= d2Score ? 1 : 0;
}
#else
return d1Score >= d2Score ? 1 : 0;
#endif
}
void mndSortVnodeGid(SVgObj *pVgroup) {
@ -447,7 +442,7 @@ void mndSortVnodeGid(SVgObj *pVgroup) {
}
}
static int32_t mndGetAvailableDnode(SMnode *pMnode, SVgObj *pVgroup, SArray *pArray) {
static int32_t mndGetAvailableDnode(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, SArray *pArray) {
SSdb *pSdb = pMnode->pSdb;
int32_t allocedVnodes = 0;
void *pIter = NULL;
@ -470,6 +465,16 @@ static int32_t mndGetAvailableDnode(SMnode *pMnode, SVgObj *pVgroup, SArray *pAr
return -1;
}
int64_t vgMem = mndGetVgroupMemory(pMnode, pDb, pVgroup);
if (pDnode->memAvail - vgMem - pDnode->memUsed <= 0) {
mError("db:%s, vgId:%d, no enough memory:%" PRId64 " in dnode:%d, avail:%" PRId64 " used:%" PRId64,
pVgroup->dbName, pVgroup->vgId, vgMem, pDnode->id, pDnode->memAvail, pDnode->memUsed);
terrno = TSDB_CODE_MND_NO_ENOUGH_MEM_IN_DNODE;
return -1;
} else {
pDnode->memUsed += vgMem;
}
pVgid->dnodeId = pDnode->id;
if (pVgroup->replica == 1) {
pVgid->role = TAOS_SYNC_STATE_LEADER;
@ -477,7 +482,8 @@ static int32_t mndGetAvailableDnode(SMnode *pMnode, SVgObj *pVgroup, SArray *pAr
pVgid->role = TAOS_SYNC_STATE_FOLLOWER;
}
mInfo("db:%s, vgId:%d, vn:%d dnode:%d is alloced", pVgroup->dbName, pVgroup->vgId, v, pVgid->dnodeId);
mInfo("db:%s, vgId:%d, vn:%d is alloced, memory:%" PRId64 ", dnode:%d avail:%" PRId64 " used:%" PRId64,
pVgroup->dbName, pVgroup->vgId, v, vgMem, pVgid->dnodeId, pDnode->memAvail, pDnode->memUsed);
pDnode->numOfVnodes++;
}
@ -498,7 +504,7 @@ int32_t mndAllocSmaVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup) {
pVgroup->dbUid = pDb->uid;
pVgroup->replica = 1;
if (mndGetAvailableDnode(pMnode, pVgroup, pArray) != 0) return -1;
if (mndGetAvailableDnode(pMnode, pDb, pVgroup, pArray) != 0) return -1;
mInfo("db:%s, sma vgId:%d is alloced", pDb->name, pVgroup->vgId);
return 0;
@ -546,8 +552,7 @@ int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups) {
pVgroup->dbUid = pDb->uid;
pVgroup->replica = pDb->cfg.replications;
if (mndGetAvailableDnode(pMnode, pVgroup, pArray) != 0) {
terrno = TSDB_CODE_MND_NO_ENOUGH_DNODES;
if (mndGetAvailableDnode(pMnode, pDb, pVgroup, pArray) != 0) {
goto _OVER;
}
@ -728,6 +733,43 @@ int32_t mndGetVnodesNum(SMnode *pMnode, int32_t dnodeId) {
return numOfVnodes;
}
int64_t mndGetVgroupMemory(SMnode *pMnode, SDbObj *pDbInput, SVgObj *pVgroup) {
SDbObj *pDb = pDbInput;
if (pDbInput == NULL) {
pDb = mndAcquireDb(pMnode, pVgroup->dbName);
}
int64_t vgroupMemroy = (int64_t)pDb->cfg.buffer * 1024 * 1024 + (int64_t)pDb->cfg.pages * pDb->cfg.pageSize * 1024;
if (pDb->cfg.cacheLastRow > 0) {
vgroupMemroy += (int64_t)pDb->cfg.lastRowMem * 1024 * 1024;
}
if (pDbInput == NULL) {
mndReleaseDb(pMnode, pDb);
}
return vgroupMemroy;
}
static bool mndGetVnodeMemroyFp(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
SVgObj *pVgroup = pObj;
int32_t dnodeId = *(int32_t *)p1;
int64_t *pVnodeMemory = (int64_t *)p2;
for (int32_t v = 0; v < pVgroup->replica; ++v) {
if (pVgroup->vnodeGid[v].dnodeId == dnodeId) {
*pVnodeMemory += mndGetVgroupMemory(pMnode, NULL, pVgroup);
}
}
return true;
}
int64_t mndGetVnodesMemory(SMnode *pMnode, int32_t dnodeId) {
int64_t vnodeMemory = 0;
sdbTraverse(pMnode->pSdb, SDB_VGROUP, mndGetVnodeMemroyFp, &dnodeId, &vnodeMemory, NULL);
return vnodeMemory;
}
static int32_t mndRetrieveVnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb;
@ -807,9 +849,20 @@ int32_t mndAddVnodeToVgroup(SMnode *pMnode, SVgObj *pVgroup, SArray *pArray) {
return -1;
}
int64_t vgMem = mndGetVgroupMemory(pMnode, NULL, pVgroup);
if (pDnode->memAvail - vgMem - pDnode->memUsed <= 0) {
mError("db:%s, vgId:%d, no enough memory:%" PRId64 " in dnode:%d avail:%" PRId64 " used:%" PRId64,
pVgroup->dbName, pVgroup->vgId, vgMem, pDnode->id, pDnode->memAvail, pDnode->memUsed);
terrno = TSDB_CODE_MND_NO_ENOUGH_MEM_IN_DNODE;
return -1;
} else {
pDnode->memUsed += vgMem;
}
pVgid->dnodeId = pDnode->id;
pVgid->role = TAOS_SYNC_STATE_ERROR;
mInfo("db:%s, vgId:%d, vn:%d dnode:%d, is added", pVgroup->dbName, pVgroup->vgId, pVgroup->replica, pVgid->dnodeId);
mInfo("db:%s, vgId:%d, vn:%d is added, memory:%" PRId64 ", dnode:%d avail:%" PRId64 " used:%" PRId64,
pVgroup->dbName, pVgroup->vgId, pVgroup->replica, vgMem, pVgid->dnodeId, pDnode->memAvail, pDnode->memUsed);
pVgroup->replica++;
pDnode->numOfVnodes++;
@ -835,7 +888,10 @@ int32_t mndRemoveVnodeFromVgroup(SMnode *pMnode, SVgObj *pVgroup, SArray *pArray
for (int32_t vn = 0; vn < pVgroup->replica; ++vn) {
SVnodeGid *pVgid = &pVgroup->vnodeGid[vn];
if (pVgid->dnodeId == pDnode->id) {
mInfo("db:%s, vgId:%d, vn:%d dnode:%d, is removed", pVgroup->dbName, pVgroup->vgId, vn, pVgid->dnodeId);
int64_t vgMem = mndGetVgroupMemory(pMnode, NULL, pVgroup);
pDnode->memUsed -= vgMem;
mInfo("db:%s, vgId:%d, vn:%d is removed, memory:%" PRId64 ", dnode:%d avail:%" PRId64 " used:%" PRId64,
pVgroup->dbName, pVgroup->vgId, vn, vgMem, pVgid->dnodeId, pDnode->memAvail, pDnode->memUsed);
pDnode->numOfVnodes--;
pVgroup->replica--;
*pDelVgid = *pVgid;
@ -1161,6 +1217,17 @@ static int32_t mndRedistributeVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb,
terrno = TSDB_CODE_MND_NO_ENOUGH_DNODES;
goto _OVER;
}
int64_t vgMem = mndGetVgroupMemory(pMnode, NULL, pVgroup);
if (pNew1->memAvail - vgMem - pNew1->memUsed <= 0) {
mError("db:%s, vgId:%d, no enough memory:%" PRId64 " in dnode:%d avail:%" PRId64 " used:%" PRId64,
pVgroup->dbName, pVgroup->vgId, vgMem, pNew1->id, pNew1->memAvail, pNew1->memUsed);
terrno = TSDB_CODE_MND_NO_ENOUGH_MEM_IN_DNODE;
return -1;
} else {
pNew1->memUsed += vgMem;
}
if (mndAddIncVgroupReplicaToTrans(pMnode, pTrans, pDb, &newVg, pNew1->id) != 0) goto _OVER;
if (mndAddDecVgroupReplicaFromTrans(pMnode, pTrans, pDb, &newVg, pOld1->id) != 0) goto _OVER;
}
@ -1173,6 +1240,15 @@ static int32_t mndRedistributeVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb,
terrno = TSDB_CODE_MND_NO_ENOUGH_DNODES;
goto _OVER;
}
int64_t vgMem = mndGetVgroupMemory(pMnode, NULL, pVgroup);
if (pNew2->memAvail - vgMem - pNew2->memUsed <= 0) {
mError("db:%s, vgId:%d, no enough memory:%" PRId64 " in dnode:%d avail:%" PRId64 " used:%" PRId64,
pVgroup->dbName, pVgroup->vgId, vgMem, pNew2->id, pNew2->memAvail, pNew2->memUsed);
terrno = TSDB_CODE_MND_NO_ENOUGH_MEM_IN_DNODE;
return -1;
} else {
pNew2->memUsed += vgMem;
}
if (mndAddIncVgroupReplicaToTrans(pMnode, pTrans, pDb, &newVg, pNew2->id) != 0) goto _OVER;
if (mndAddDecVgroupReplicaFromTrans(pMnode, pTrans, pDb, &newVg, pOld2->id) != 0) goto _OVER;
}
@ -1185,6 +1261,15 @@ static int32_t mndRedistributeVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb,
terrno = TSDB_CODE_MND_NO_ENOUGH_DNODES;
goto _OVER;
}
int64_t vgMem = mndGetVgroupMemory(pMnode, NULL, pVgroup);
if (pNew3->memAvail - vgMem - pNew3->memUsed <= 0) {
mError("db:%s, vgId:%d, no enough memory:%" PRId64 " in dnode:%d avail:%" PRId64 " used:%" PRId64,
pVgroup->dbName, pVgroup->vgId, vgMem, pNew3->id, pNew3->memAvail, pNew3->memUsed);
terrno = TSDB_CODE_MND_NO_ENOUGH_MEM_IN_DNODE;
return -1;
} else {
pNew3->memUsed += vgMem;
}
if (mndAddIncVgroupReplicaToTrans(pMnode, pTrans, pDb, &newVg, pNew3->id) != 0) goto _OVER;
if (mndAddDecVgroupReplicaFromTrans(pMnode, pTrans, pDb, &newVg, pOld3->id) != 0) goto _OVER;
}
@ -1415,7 +1500,7 @@ _OVER:
mndReleaseDb(pMnode, pDb);
return code;
#endif
#endif
}
int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SArray *pArray) {
@ -1654,9 +1739,9 @@ static int32_t mndBalanceVgroupBetweenDnode(SMnode *pMnode, STrans *pTrans, SDno
}
static int32_t mndBalanceVgroup(SMnode *pMnode, SRpcMsg *pReq, SArray *pArray) {
int32_t code = -1;
int32_t numOfVgroups = 0;
STrans *pTrans = NULL;
int32_t code = -1;
int32_t numOfVgroups = 0;
STrans *pTrans = NULL;
SHashObj *pBalancedVgroups = NULL;
pBalancedVgroups = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
@ -1721,7 +1806,7 @@ static int32_t mndProcessBalanceVgroupMsg(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
int32_t code = -1;
SArray *pArray = NULL;
void *pIter = NULL;
void *pIter = NULL;
int64_t curMs = taosGetTimestampMs();
SBalanceVgroupReq req = {0};
@ -1766,7 +1851,7 @@ _OVER:
taosArrayDestroy(pArray);
return code;
#endif
#endif
}
bool mndVgroupInDb(SVgObj *pVgroup, int64_t dbUid) { return !pVgroup->isTsma && pVgroup->dbUid == dbUid; }

View File

@ -29,13 +29,17 @@ extern "C" {
#define EXPLAIN_TAG_SCAN_FORMAT "Tag Scan on %s"
#define EXPLAIN_TBL_SCAN_FORMAT "Table Scan on %s"
#define EXPLAIN_SYSTBL_SCAN_FORMAT "System Table Scan on %s"
#define EXPLAIN_DISTBLK_SCAN_FORMAT "Block Dist Scan on %s"
#define EXPLAIN_LASTROW_SCAN_FORMAT "Last Row Scan on %s"
#define EXPLAIN_PROJECTION_FORMAT "Projection"
#define EXPLAIN_JOIN_FORMAT "%s"
#define EXPLAIN_AGG_FORMAT "Aggragate"
#define EXPLAIN_INDEF_ROWS_FORMAT "Indefinite Rows Function"
#define EXPLAIN_EXCHANGE_FORMAT "Data Exchange %d:1"
#define EXPLAIN_SORT_FORMAT "Sort"
#define EXPLAIN_GROUP_SORT_FORMAT "Group Sort"
#define EXPLAIN_INTERVAL_FORMAT "Interval on Column %s"
#define EXPLAIN_MERGE_INTERVAL_FORMAT "Merge Interval on Column %s"
#define EXPLAIN_FILL_FORMAT "Fill"
#define EXPLAIN_SESSION_FORMAT "Session"
#define EXPLAIN_STATE_WINDOW_FORMAT "StateWindow on Column %s"
@ -62,10 +66,12 @@ extern "C" {
#define EXPLAIN_COST_FORMAT "cost=%.2f..%.2f"
#define EXPLAIN_ROWS_FORMAT "rows=%" PRIu64
#define EXPLAIN_COLUMNS_FORMAT "columns=%d"
#define EXPLAIN_PSEUDO_COLUMNS_FORMAT "pseudo_columns=%d"
#define EXPLAIN_WIDTH_FORMAT "width=%d"
#define EXPLAIN_TABLE_SCAN_FORMAT "order=[asc|%d desc|%d]"
#define EXPLAIN_GROUPS_FORMAT "groups=%d"
#define EXPLAIN_WIDTH_FORMAT "width=%d"
#define EXPLAIN_INTERVAL_VALUE_FORMAT "interval=%" PRId64 "%c"
#define EXPLAIN_FUNCTIONS_FORMAT "functions=%d"
#define EXPLAIN_EXECINFO_FORMAT "cost=%.3f..%.3f rows=%" PRIu64
#define EXPLAIN_MODE_FORMAT "mode=%s"

View File

@ -199,6 +199,31 @@ int32_t qExplainGenerateResChildren(SPhysiNode *pNode, SExplainGroup *group, SNo
pPhysiChildren = mergePhysiNode->scan.node.pChildren;
break;
}
case QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN: {
SBlockDistScanPhysiNode *distPhysiNode = (SBlockDistScanPhysiNode *)pNode;
pPhysiChildren = distPhysiNode->node.pChildren;
break;
}
case QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN: {
SLastRowScanPhysiNode *lastRowPhysiNode = (SLastRowScanPhysiNode *)pNode;
pPhysiChildren = lastRowPhysiNode->node.pChildren;
break;
}
case QUERY_NODE_PHYSICAL_PLAN_GROUP_SORT: {
SGroupSortPhysiNode *groupSortPhysiNode = (SGroupSortPhysiNode *)pNode;
pPhysiChildren = groupSortPhysiNode->node.pChildren;
break;
}
case QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL: {
SMergeIntervalPhysiNode *mergeIntPhysiNode = (SMergeIntervalPhysiNode *)pNode;
pPhysiChildren = mergeIntPhysiNode->window.node.pChildren;
break;
}
case QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC: {
SInterpFuncPhysiNode *interpPhysiNode = (SInterpFuncPhysiNode *)pNode;
pPhysiChildren = interpPhysiNode->node.pChildren;
break;
}
default:
qError("not supported physical node type %d", pNode->type);
QRY_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
@ -378,6 +403,10 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
}
EXPLAIN_ROW_APPEND(EXPLAIN_COLUMNS_FORMAT, pTagScanNode->pScanCols->length);
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
if (pTagScanNode->pScanPseudoCols) {
EXPLAIN_ROW_APPEND(EXPLAIN_PSEUDO_COLUMNS_FORMAT, pTagScanNode->pScanPseudoCols->length);
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
}
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pTagScanNode->node.pOutputDataBlockDesc->totalRowSize);
EXPLAIN_ROW_APPEND(EXPLAIN_RIGHT_PARENTHESIS_FORMAT);
EXPLAIN_ROW_END();
@ -415,6 +444,10 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
EXPLAIN_ROW_APPEND(EXPLAIN_COLUMNS_FORMAT, pTblScanNode->scan.pScanCols->length);
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
if (pTblScanNode->scan.pScanPseudoCols) {
EXPLAIN_ROW_APPEND(EXPLAIN_PSEUDO_COLUMNS_FORMAT, pTblScanNode->scan.pScanPseudoCols->length);
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
}
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pTblScanNode->scan.node.pOutputDataBlockDesc->totalRowSize);
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_TABLE_SCAN_FORMAT, pTblScanNode->scanSeq[0], pTblScanNode->scanSeq[1]);
@ -516,6 +549,10 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
}
EXPLAIN_ROW_APPEND(EXPLAIN_COLUMNS_FORMAT, pSTblScanNode->scan.pScanCols->length);
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
if (pSTblScanNode->scan.pScanPseudoCols) {
EXPLAIN_ROW_APPEND(EXPLAIN_PSEUDO_COLUMNS_FORMAT, pSTblScanNode->scan.pScanPseudoCols->length);
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
}
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pSTblScanNode->scan.node.pOutputDataBlockDesc->totalRowSize);
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_RIGHT_PARENTHESIS_FORMAT);
@ -1131,6 +1168,258 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
}
break;
}
case QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN: {
SBlockDistScanPhysiNode *pDistScanNode = (SBlockDistScanPhysiNode *)pNode;
EXPLAIN_ROW_NEW(level, EXPLAIN_DISTBLK_SCAN_FORMAT, pDistScanNode->tableName.tname);
EXPLAIN_ROW_APPEND(EXPLAIN_LEFT_PARENTHESIS_FORMAT);
if (pResNode->pExecInfo) {
QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen));
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
}
EXPLAIN_ROW_APPEND(EXPLAIN_COLUMNS_FORMAT, pDistScanNode->pScanCols->length);
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
if (pDistScanNode->pScanPseudoCols) {
EXPLAIN_ROW_APPEND(EXPLAIN_PSEUDO_COLUMNS_FORMAT, pDistScanNode->pScanPseudoCols->length);
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
}
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pDistScanNode->node.pOutputDataBlockDesc->totalRowSize);
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_RIGHT_PARENTHESIS_FORMAT);
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level));
if (verbose) {
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_OUTPUT_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_COLUMNS_FORMAT,
nodesGetOutputNumFromSlotList(pDistScanNode->node.pOutputDataBlockDesc->pSlots));
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pDistScanNode->node.pOutputDataBlockDesc->outputRowSize);
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
if (pDistScanNode->node.pConditions) {
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILTER_FORMAT);
QRY_ERR_RET(nodesNodeToSQL(pDistScanNode->node.pConditions, tbuf + VARSTR_HEADER_SIZE,
TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen));
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
}
}
break;
}
case QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN: {
SLastRowScanPhysiNode *pLastRowNode = (SLastRowScanPhysiNode *)pNode;
EXPLAIN_ROW_NEW(level, EXPLAIN_LASTROW_SCAN_FORMAT, pLastRowNode->tableName.tname);
EXPLAIN_ROW_APPEND(EXPLAIN_LEFT_PARENTHESIS_FORMAT);
if (pResNode->pExecInfo) {
QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen));
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
}
EXPLAIN_ROW_APPEND(EXPLAIN_COLUMNS_FORMAT, pLastRowNode->pScanCols->length);
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
if (pLastRowNode->pScanPseudoCols) {
EXPLAIN_ROW_APPEND(EXPLAIN_PSEUDO_COLUMNS_FORMAT, pLastRowNode->pScanPseudoCols->length);
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
}
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pLastRowNode->node.pOutputDataBlockDesc->totalRowSize);
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_RIGHT_PARENTHESIS_FORMAT);
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level));
if (verbose) {
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_OUTPUT_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_COLUMNS_FORMAT,
nodesGetOutputNumFromSlotList(pLastRowNode->node.pOutputDataBlockDesc->pSlots));
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pLastRowNode->node.pOutputDataBlockDesc->outputRowSize);
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
if (pLastRowNode->node.pConditions) {
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILTER_FORMAT);
QRY_ERR_RET(nodesNodeToSQL(pLastRowNode->node.pConditions, tbuf + VARSTR_HEADER_SIZE,
TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen));
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
}
}
break;
}
case QUERY_NODE_PHYSICAL_PLAN_GROUP_SORT: {
SGroupSortPhysiNode *pSortNode = (SGroupSortPhysiNode *)pNode;
EXPLAIN_ROW_NEW(level, EXPLAIN_GROUP_SORT_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_LEFT_PARENTHESIS_FORMAT);
if (pResNode->pExecInfo) {
QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen));
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
}
SDataBlockDescNode *pDescNode = pSortNode->node.pOutputDataBlockDesc;
EXPLAIN_ROW_APPEND(EXPLAIN_COLUMNS_FORMAT, nodesGetOutputNumFromSlotList(pDescNode->pSlots));
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pDescNode->totalRowSize);
EXPLAIN_ROW_APPEND(EXPLAIN_RIGHT_PARENTHESIS_FORMAT);
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level));
if (EXPLAIN_MODE_ANALYZE == ctx->mode) {
// sort key
EXPLAIN_ROW_NEW(level + 1, "Sort Key: ");
if (pResNode->pExecInfo) {
for (int32_t i = 0; i < LIST_LENGTH(pSortNode->pSortKeys); ++i) {
SOrderByExprNode *ptn = (SOrderByExprNode *)nodesListGetNode(pSortNode->pSortKeys, i);
EXPLAIN_ROW_APPEND("%s ", nodesGetNameFromColumnNode(ptn->pExpr));
}
}
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level));
// sort method
EXPLAIN_ROW_NEW(level + 1, "Sort Method: ");
int32_t nodeNum = taosArrayGetSize(pResNode->pExecInfo);
SExplainExecInfo *execInfo = taosArrayGet(pResNode->pExecInfo, 0);
SSortExecInfo *pExecInfo = (SSortExecInfo *)execInfo->verboseInfo;
EXPLAIN_ROW_APPEND("%s", pExecInfo->sortMethod == SORT_QSORT_T ? "quicksort" : "merge sort");
if (pExecInfo->sortBuffer > 1024 * 1024) {
EXPLAIN_ROW_APPEND(" Buffers:%.2f Mb", pExecInfo->sortBuffer / (1024 * 1024.0));
} else if (pExecInfo->sortBuffer > 1024) {
EXPLAIN_ROW_APPEND(" Buffers:%.2f Kb", pExecInfo->sortBuffer / (1024.0));
} else {
EXPLAIN_ROW_APPEND(" Buffers:%d b", pExecInfo->sortBuffer);
}
EXPLAIN_ROW_APPEND(" loops:%d", pExecInfo->loops);
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level));
}
if (verbose) {
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_OUTPUT_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_COLUMNS_FORMAT,
nodesGetOutputNumFromSlotList(pSortNode->node.pOutputDataBlockDesc->pSlots));
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pSortNode->node.pOutputDataBlockDesc->outputRowSize);
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
if (pSortNode->node.pConditions) {
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILTER_FORMAT);
QRY_ERR_RET(nodesNodeToSQL(pSortNode->node.pConditions, tbuf + VARSTR_HEADER_SIZE,
TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen));
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
}
}
break;
}
case QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL: {
SMergeIntervalPhysiNode *pIntNode = (SMergeIntervalPhysiNode *)pNode;
EXPLAIN_ROW_NEW(level, EXPLAIN_MERGE_INTERVAL_FORMAT, nodesGetNameFromColumnNode(pIntNode->window.pTspk));
EXPLAIN_ROW_APPEND(EXPLAIN_LEFT_PARENTHESIS_FORMAT);
if (pResNode->pExecInfo) {
QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen));
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
}
EXPLAIN_ROW_APPEND(EXPLAIN_FUNCTIONS_FORMAT, pIntNode->window.pFuncs->length);
EXPLAIN_ROW_APPEND(EXPLAIN_RIGHT_PARENTHESIS_FORMAT);
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level));
if (verbose) {
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_OUTPUT_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_COLUMNS_FORMAT,
nodesGetOutputNumFromSlotList(pIntNode->window.node.pOutputDataBlockDesc->pSlots));
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pIntNode->window.node.pOutputDataBlockDesc->outputRowSize);
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
uint8_t precision = getIntervalPrecision(pIntNode);
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_TIME_WINDOWS_FORMAT,
INVERAL_TIME_FROM_PRECISION_TO_UNIT(pIntNode->interval, pIntNode->intervalUnit, precision),
pIntNode->intervalUnit, pIntNode->offset, getPrecisionUnit(precision),
INVERAL_TIME_FROM_PRECISION_TO_UNIT(pIntNode->sliding, pIntNode->slidingUnit, precision),
pIntNode->slidingUnit);
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
if (pIntNode->window.node.pConditions) {
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILTER_FORMAT);
QRY_ERR_RET(nodesNodeToSQL(pIntNode->window.node.pConditions, tbuf + VARSTR_HEADER_SIZE,
TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen));
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
}
}
break;
}
case QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC: {
SInterpFuncPhysiNode *pInterpNode = (SInterpFuncPhysiNode *)pNode;
EXPLAIN_ROW_NEW(level, EXPLAIN_AGG_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_LEFT_PARENTHESIS_FORMAT);
if (pResNode->pExecInfo) {
QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen));
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
}
if (pInterpNode->pFuncs) {
EXPLAIN_ROW_APPEND(EXPLAIN_FUNCTIONS_FORMAT, pInterpNode->pFuncs->length);
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
}
EXPLAIN_ROW_APPEND(EXPLAIN_MODE_FORMAT, nodesGetFillModeString(pInterpNode->fillMode));
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_RIGHT_PARENTHESIS_FORMAT);
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level));
if (verbose) {
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_OUTPUT_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_COLUMNS_FORMAT,
nodesGetOutputNumFromSlotList(pInterpNode->node.pOutputDataBlockDesc->pSlots));
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pInterpNode->node.pOutputDataBlockDesc->outputRowSize);
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
if (pInterpNode->pFillValues) {
SNodeListNode *pValues = (SNodeListNode *)pInterpNode->pFillValues;
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILL_VALUE_FORMAT);
SNode *tNode = NULL;
int32_t i = 0;
FOREACH(tNode, pValues->pNodeList) {
if (i) {
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
}
SValueNode *tValue = (SValueNode *)tNode;
char *value = nodesGetStrValueFromNode(tValue);
EXPLAIN_ROW_APPEND(EXPLAIN_STRING_TYPE_FORMAT, value);
taosMemoryFree(value);
++i;
}
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
}
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_INTERVAL_VALUE_FORMAT, pInterpNode->interval, pInterpNode->intervalUnit);
EXPLAIN_ROW_END();
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_TIMERANGE_FORMAT, pInterpNode->timeRange.skey, pInterpNode->timeRange.ekey);
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
if (pInterpNode->node.pConditions) {
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILTER_FORMAT);
QRY_ERR_RET(nodesNodeToSQL(pInterpNode->node.pConditions, tbuf + VARSTR_HEADER_SIZE,
TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen));
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
}
}
break;
}
default:
qError("not supported physical node type %d", pNode->type);
return TSDB_CODE_QRY_APP_ERROR;

View File

@ -2843,11 +2843,18 @@ int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scan
int32_t doPrepareScan(SOperatorInfo* pOperator, uint64_t uid, int64_t ts) {
int32_t type = pOperator->operatorType;
pOperator->status = OP_OPENED;
if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
SStreamBlockScanInfo* pScanInfo = pOperator->info;
pScanInfo->blockType = STREAM_INPUT__DATA_SCAN;
pScanInfo->pSnapshotReadOp->status = OP_OPENED;
STableScanInfo* pInfo = pScanInfo->pSnapshotReadOp->info;
ASSERT(pInfo->scanMode == TABLE_SCAN__TABLE_ORDER);
if (uid == 0) {
pInfo->noTable = 1;
return TSDB_CODE_SUCCESS;
@ -2861,14 +2868,6 @@ int32_t doPrepareScan(SOperatorInfo* pOperator, uint64_t uid, int64_t ts) {
pInfo->noTable = 0;
if (pInfo->lastStatus.uid != uid || pInfo->lastStatus.ts != ts) {
tsdbSetTableId(pInfo->dataReader, uid);
int64_t oldSkey = pInfo->cond.twindows[0].skey;
pInfo->cond.twindows[0].skey = ts + 1;
tsdbResetReadHandle(pInfo->dataReader, &pInfo->cond, 0);
pInfo->cond.twindows[0].skey = oldSkey;
pInfo->scanTimes = 0;
pInfo->curTWinIdx = 0;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
int32_t tableSz = taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList);
@ -2880,8 +2879,17 @@ int32_t doPrepareScan(SOperatorInfo* pOperator, uint64_t uid, int64_t ts) {
pInfo->currentTable = i;
}
}
// TODO after processing drop,
// TODO after processing drop, found can be false
ASSERT(found);
tsdbSetTableId(pInfo->dataReader, uid);
int64_t oldSkey = pInfo->cond.twindows[0].skey;
pInfo->cond.twindows[0].skey = ts + 1;
tsdbResetReadHandle(pInfo->dataReader, &pInfo->cond, 0);
pInfo->cond.twindows[0].skey = oldSkey;
pInfo->scanTimes = 0;
pInfo->curTWinIdx = 0;
qDebug("tsdb reader offset seek to uid %ld ts %ld, table cur set to %d , all table num %d", uid, ts,
pInfo->currentTable, tableSz);
}

View File

@ -3138,6 +3138,7 @@ static void buildAlterDbReq(STranslateContext* pCxt, SAlterDatabaseStmt* pStmt,
pReq->buffer = pStmt->pOptions->buffer;
pReq->pageSize = -1;
pReq->pages = pStmt->pOptions->pages;
pReq->lastRowMem = -1;
pReq->daysPerFile = -1;
pReq->daysToKeep0 = pStmt->pOptions->keep[0];
pReq->daysToKeep1 = pStmt->pOptions->keep[1];

View File

@ -552,6 +552,9 @@ static int32_t createSystemTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan*
if (0 == strcmp(pScanLogicNode->tableName.tname, TSDB_INS_TABLE_USER_TABLES) ||
0 == strcmp(pScanLogicNode->tableName.tname, TSDB_INS_TABLE_USER_TABLE_DISTRIBUTED)) {
vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);
} else {
pSubplan->execNode.nodeId = MNODE_HANDLE;
pSubplan->execNode.epSet = pCxt->pPlanCxt->mgmtEpSet;
}
SQueryNodeLoad node = {.addr = {.nodeId = MNODE_HANDLE, .epSet = pCxt->pPlanCxt->mgmtEpSet}, .load = 0};
taosArrayPush(pCxt->pExecNodeList, &node);

View File

@ -1018,14 +1018,14 @@ static int32_t unionMountSubplan(SLogicSubplan* pParent, SNodeList* pChildren) {
return TSDB_CODE_SUCCESS;
}
static SLogicSubplan* unionCreateSubplan(SSplitContext* pCxt, SLogicNode* pNode) {
static SLogicSubplan* unionCreateSubplan(SSplitContext* pCxt, SLogicNode* pNode, ESubplanType subplanType) {
SLogicSubplan* pSubplan = (SLogicSubplan*)nodesMakeNode(QUERY_NODE_LOGIC_SUBPLAN);
if (NULL == pSubplan) {
return NULL;
}
pSubplan->id.queryId = pCxt->queryId;
pSubplan->id.groupId = pCxt->groupId;
pSubplan->subplanType = SUBPLAN_TYPE_SCAN;
pSubplan->subplanType = subplanType;
pSubplan->pNode = pNode;
pNode->pParent = NULL;
return pSubplan;
@ -1039,7 +1039,7 @@ static int32_t unionSplitSubplan(SSplitContext* pCxt, SLogicSubplan* pUnionSubpl
SNode* pChild = NULL;
FOREACH(pChild, pSplitNode->pChildren) {
SLogicSubplan* pNewSubplan = unionCreateSubplan(pCxt, (SLogicNode*)pChild);
SLogicSubplan* pNewSubplan = unionCreateSubplan(pCxt, (SLogicNode*)pChild, pUnionSubplan->subplanType);
code = nodesListMakeStrictAppend(&pUnionSubplan->pChildren, (SNode*)pNewSubplan);
if (TSDB_CODE_SUCCESS == code) {
REPLACE_NODE(NULL);

View File

@ -1035,3 +1035,72 @@ _return:
sclFreeRes(ctx.pRes);
return code;
}
int32_t scalarGetOperatorResultType(SDataType left, SDataType right, EOperatorType op, SDataType* pRes) {
switch (op) {
case OP_TYPE_ADD:
if (left.type == TSDB_DATA_TYPE_TIMESTAMP && right.type == TSDB_DATA_TYPE_TIMESTAMP) {
qError("invalid op %d, left type:%d, right type:%d", op, left.type, right.type);
return TSDB_CODE_TSC_INVALID_OPERATION;
}
if ((left.type == TSDB_DATA_TYPE_TIMESTAMP && (IS_INTEGER_TYPE(right.type) || right.type == TSDB_DATA_TYPE_BOOL)) ||
(right.type == TSDB_DATA_TYPE_TIMESTAMP && (IS_INTEGER_TYPE(left.type) || left.type == TSDB_DATA_TYPE_BOOL))) {
pRes->type = TSDB_DATA_TYPE_TIMESTAMP;
return TSDB_CODE_SUCCESS;
}
pRes->type = TSDB_DATA_TYPE_DOUBLE;
return TSDB_CODE_SUCCESS;
case OP_TYPE_SUB:
if ((left.type == TSDB_DATA_TYPE_TIMESTAMP && right.type == TSDB_DATA_TYPE_BIGINT) ||
(right.type == TSDB_DATA_TYPE_TIMESTAMP && left.type == TSDB_DATA_TYPE_BIGINT)) {
pRes->type = TSDB_DATA_TYPE_TIMESTAMP;
return TSDB_CODE_SUCCESS;
}
pRes->type = TSDB_DATA_TYPE_DOUBLE;
return TSDB_CODE_SUCCESS;
case OP_TYPE_MULTI:
if (left.type == TSDB_DATA_TYPE_TIMESTAMP && right.type == TSDB_DATA_TYPE_TIMESTAMP) {
qError("invalid op %d, left type:%d, right type:%d", op, left.type, right.type);
return TSDB_CODE_TSC_INVALID_OPERATION;
}
case OP_TYPE_DIV:
if (left.type == TSDB_DATA_TYPE_TIMESTAMP && right.type == TSDB_DATA_TYPE_TIMESTAMP) {
qError("invalid op %d, left type:%d, right type:%d", op, left.type, right.type);
return TSDB_CODE_TSC_INVALID_OPERATION;
}
case OP_TYPE_REM:
case OP_TYPE_MINUS:
pRes->type = TSDB_DATA_TYPE_DOUBLE;
return TSDB_CODE_SUCCESS;
case OP_TYPE_GREATER_THAN:
case OP_TYPE_GREATER_EQUAL:
case OP_TYPE_LOWER_THAN:
case OP_TYPE_LOWER_EQUAL:
case OP_TYPE_EQUAL:
case OP_TYPE_NOT_EQUAL:
case OP_TYPE_IN:
case OP_TYPE_NOT_IN:
case OP_TYPE_LIKE:
case OP_TYPE_NOT_LIKE:
case OP_TYPE_MATCH:
case OP_TYPE_NMATCH:
case OP_TYPE_IS_NULL:
case OP_TYPE_IS_NOT_NULL:
case OP_TYPE_IS_TRUE:
case OP_TYPE_JSON_CONTAINS:
pRes->type = TSDB_DATA_TYPE_BOOL;
return TSDB_CODE_SUCCESS;
case OP_TYPE_BIT_AND:
case OP_TYPE_BIT_OR:
pRes->type = TSDB_DATA_TYPE_BIGINT;
return TSDB_CODE_SUCCESS;
case OP_TYPE_JSON_GET_VALUE:
pRes->type = TSDB_DATA_TYPE_JSON;
return TSDB_CODE_SUCCESS;
default:
ASSERT(0);
return TSDB_CODE_APP_ERROR;
}
}

View File

@ -184,9 +184,7 @@ FORCE_INLINE bool schJobNeedToStop(SSchJob *pJob, int8_t *pStatus) {
}
if ((*pJob->chkKillFp)(pJob->chkKillParam)) {
schUpdateJobStatus(pJob, JOB_TASK_STATUS_DROPPING);
schUpdateJobErrCode(pJob, TSDB_CODE_TSC_QUERY_KILLED);
return true;
}
@ -669,6 +667,11 @@ int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) {
return TSDB_CODE_SUCCESS;
}
if (SCH_IS_DATA_SRC_QRY_TASK(pTask)) {
SCH_TASK_ELOG("no execNode specifed for data src task, numOfEps:%d", pTask->plan->execNode.epSet.numOfEps);
SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
}
SCH_ERR_RET(schSetAddrsFromNodeList(pJob, pTask));
/*
@ -811,14 +814,6 @@ int32_t schMoveTaskToExecList(SSchJob *pJob, SSchTask *pTask, bool *moved) {
*/
int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bool *needRetry) {
int8_t status = 0;
if (schJobNeedToStop(pJob, &status)) {
*needRetry = false;
SCH_TASK_DLOG("task no more retry cause of job status, job status:%s", jobTaskStatusStr(status));
return TSDB_CODE_SUCCESS;
}
if (TSDB_CODE_SCH_TIMEOUT_ERROR == errCode) {
pTask->maxExecTimes++;
if (pTask->timeoutUsec < SCH_MAX_TASK_TIMEOUT_USEC) {
@ -1277,7 +1272,7 @@ int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId* pEpId, SArray* pStatusList) {
for (int32_t i = 0; i < taskNum; ++i) {
STaskStatus *taskStatus = taosArrayGet(pStatusList, i);
qDebug("QID:%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d task status in server: %s",
qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d task status in server: %s",
taskStatus->queryId, taskStatus->taskId, taskStatus->execId, jobTaskStatusStr(taskStatus->status));
SSchJob *pJob = schAcquireJob(taskStatus->refId);
@ -1495,6 +1490,8 @@ void schFreeJobImpl(void *job) {
uint64_t queryId = pJob->queryId;
int64_t refId = pJob->refId;
qDebug("QID:0x%" PRIx64 " begin to free sch job, refId:0x%" PRIx64 ", pointer:%p", queryId, refId, pJob);
if (pJob->status == JOB_TASK_STATUS_EXECUTING) {
schCancelJob(pJob);
}
@ -1535,12 +1532,12 @@ void schFreeJobImpl(void *job) {
taosMemoryFreeClear(pJob->resData);
taosMemoryFree(pJob);
qDebug("QID:0x%" PRIx64 " sch job freed, refId:0x%" PRIx64 ", pointer:%p", queryId, refId, pJob);
int32_t jobNum = atomic_sub_fetch_32(&schMgmt.jobNum, 1);
if (jobNum == 0) {
schCloseJobRef();
}
qDebug("QID:0x%" PRIx64 " sch job freed, refId:0x%" PRIx64 ", pointer:%p", queryId, refId, pJob);
}
int32_t schLaunchStaticExplainJob(SSchedulerReq *pReq, SSchJob *pJob, bool sync) {
@ -1687,11 +1684,6 @@ _return:
int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf* pData, int32_t rspCode) {
int32_t code = 0;
int8_t status = 0;
if (schJobNeedToStop(pJob, &status)) {
SCH_TASK_ELOG("redirect will no continue cause of job status %s", jobTaskStatusStr(status));
SCH_RET(atomic_load_32(&pJob->errCode));
}
if ((pTask->execId + 1) >= pTask->maxExecTimes) {
SCH_TASK_DLOG("task no more retry since reach max try times, execId:%d", pTask->execId);

View File

@ -402,12 +402,16 @@ int32_t schHandleCallback(void *param, SDataBuf *pMsg, int32_t rspCode) {
goto _return;
}
code = schHandleResponseMsg(pJob, pTask, msgType, pMsg->pData, pMsg->len, rspCode);
schHandleResponseMsg(pJob, pTask, msgType, pMsg->pData, pMsg->len, rspCode);
pMsg->pData = NULL;
_return:
if (pTask) {
if (code) {
schProcessOnTaskFailure(pJob, pTask, code);
}
SCH_UNLOCK_TASK(pTask);
}

View File

@ -225,26 +225,33 @@ void schedulerStopQueryHb(void *pTrans) {
schCleanClusterHb(pTrans);
}
void schedulerFreeJob(int64_t job, int32_t errCode) {
SSchJob *pJob = schAcquireJob(job);
void schedulerFreeJob(int64_t* job, int32_t errCode) {
if (0 == *job) {
return;
}
SSchJob *pJob = schAcquireJob(*job);
if (NULL == pJob) {
qError("acquire job from jobRef list failed, may be dropped, jobId:0x%" PRIx64, job);
qError("acquire sch job failed, may be dropped, jobId:0x%" PRIx64, *job);
*job = 0;
return;
}
int32_t code = schProcessOnJobDropped(pJob, errCode);
if (TSDB_CODE_SCH_JOB_IS_DROPPING == code) {
SCH_JOB_DLOG("sch job is already dropping, refId:0x%" PRIx64, job);
SCH_JOB_DLOG("sch job is already dropping, refId:0x%" PRIx64, *job);
*job = 0;
return;
}
SCH_JOB_DLOG("start to remove job from jobRef list, refId:0x%" PRIx64, job);
SCH_JOB_DLOG("start to remove job from jobRef list, refId:0x%" PRIx64, *job);
if (taosRemoveRef(schMgmt.jobRef, job)) {
SCH_JOB_ELOG("remove job from job list failed, refId:0x%" PRIx64, job);
if (taosRemoveRef(schMgmt.jobRef, *job)) {
SCH_JOB_ELOG("remove job from job list failed, refId:0x%" PRIx64, *job);
}
schReleaseJob(job);
schReleaseJob(*job);
*job = 0;
}
void schedulerDestroy(void) {

View File

@ -457,7 +457,7 @@ void schtFreeQueryJob(int32_t freeThread) {
int64_t job = queryJobRefId;
if (job && atomic_val_compare_exchange_64(&queryJobRefId, job, 0)) {
schedulerFreeJob(job, 0);
schedulerFreeJob(&job, 0);
if (freeThread) {
if (++freeNum % schtTestPrintNum == 0) {
printf("FreeNum:%d\n", freeNum);
@ -724,7 +724,7 @@ TEST(queryTest, normalCase) {
schReleaseJob(job);
schedulerFreeJob(job, 0);
schedulerFreeJob(&job, 0);
schtFreeQueryDag(&dag);
@ -828,7 +828,7 @@ TEST(queryTest, readyFirstCase) {
schReleaseJob(job);
schedulerFreeJob(job, 0);
schedulerFreeJob(&job, 0);
schtFreeQueryDag(&dag);
@ -940,7 +940,7 @@ TEST(queryTest, flowCtrlCase) {
schReleaseJob(job);
schedulerFreeJob(job, 0);
schedulerFreeJob(&job, 0);
schtFreeQueryDag(&dag);
@ -994,7 +994,7 @@ TEST(insertTest, normalCase) {
ASSERT_EQ(code, 0);
ASSERT_EQ(res.numOfRows, 20);
schedulerFreeJob(insertJobRefId, 0);
schedulerFreeJob(&insertJobRefId, 0);
schedulerDestroy();
}

View File

@ -291,7 +291,7 @@ int32_t tfsRmdir(STfs *pTfs, const char *rname) {
for (int32_t id = 0; id < pTier->ndisk; id++) {
STfsDisk *pDisk = pTier->disks[id];
snprintf(aname, TMPNAME_LEN, "%s%s%s", pDisk->path, TD_DIRSEP, rname);
uInfo("====> tfs remove dir : path:%s aname:%s rname:[%s]", pDisk->path, aname, rname);
uInfo("tfs remove dir : path:%s aname:%s rname:[%s]", pDisk->path, aname, rname);
taosRemoveDir(aname);
}
}

View File

@ -253,7 +253,7 @@ int transAsyncSend(SAsyncPool* pool, queue* mq);
do { \
if (id > 0) { \
tTrace("handle step1"); \
SExHandle* exh2 = transAcquireExHandle(id); \
SExHandle* exh2 = transAcquireExHandle(transGetRefMgt(), id); \
if (exh2 == NULL || id != exh2->refId) { \
tTrace("handle %p except, may already freed, ignore msg, ref1: %" PRIu64 ", ref2 : %" PRIu64 "", exh1, \
exh2 ? exh2->refId : 0, id); \
@ -261,7 +261,7 @@ int transAsyncSend(SAsyncPool* pool, queue* mq);
} \
} else if (id == 0) { \
tTrace("handle step2"); \
SExHandle* exh2 = transAcquireExHandle(id); \
SExHandle* exh2 = transAcquireExHandle(transGetRefMgt(), id); \
if (exh2 == NULL || id == exh2->refId) { \
tTrace("handle %p except, may already freed, ignore msg, ref1: %" PRIu64 ", ref2 : %" PRIu64 "", exh1, id, \
exh2 ? exh2->refId : 0); \
@ -391,13 +391,16 @@ void transThreadOnce();
void transInit();
void transCleanup();
int32_t transOpenExHandleMgt(int size);
void transCloseExHandleMgt();
int64_t transAddExHandle(void* p);
int32_t transRemoveExHandle(int64_t refId);
SExHandle* transAcquireExHandle(int64_t refId);
int32_t transReleaseExHandle(int64_t refId);
void transDestoryExHandle(void* handle);
int32_t transOpenRefMgt(int size, void (*func)(void*));
void transCloseRefMgt(int32_t refMgt);
int64_t transAddExHandle(int32_t refMgt, void* p);
int32_t transRemoveExHandle(int32_t refMgt, int64_t refId);
void* transAcquireExHandle(int32_t refMgt, int64_t refId);
int32_t transReleaseExHandle(int32_t refMgt, int64_t refId);
void transDestoryExHandle(void* handle);
int32_t transGetRefMgt();
int32_t transGetInstMgt();
#ifdef __cplusplus
}

View File

@ -57,7 +57,7 @@ typedef struct {
void* parent;
void* tcphandle; // returned handle from TCP initialization
int32_t refMgt;
int64_t refId;
TdThreadMutex mutex;
} SRpcInfo;

View File

@ -76,16 +76,23 @@ void* rpcOpen(const SRpcInit* pInit) {
if (pInit->user) {
memcpy(pRpc->user, pInit->user, strlen(pInit->user));
}
return pRpc;
int64_t refId = transAddExHandle(transGetInstMgt(), pRpc);
transAcquireExHandle(transGetInstMgt(), refId);
pRpc->refId = refId;
return (void*)refId;
}
void rpcClose(void* arg) {
tInfo("start to close rpc");
transRemoveExHandle(transGetInstMgt(), (int64_t)arg);
transReleaseExHandle(transGetInstMgt(), (int64_t)arg);
tInfo("finish to close rpc");
return;
}
void rpcCloseImpl(void* arg) {
SRpcInfo* pRpc = (SRpcInfo*)arg;
(*taosCloseHandle[pRpc->connType])(pRpc->tcphandle);
taosMemoryFree(pRpc);
tInfo("finish to close rpc");
return;
}
void* rpcMallocCont(int32_t contLen) {
@ -140,11 +147,10 @@ void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pMsg, SRpcMsg* pRsp) {
transSendRecv(shandle, pEpSet, pMsg, pRsp);
}
void rpcSendResponse(const SRpcMsg* pMsg) { transSendResponse(pMsg); }
void rpcSendResponse(const SRpcMsg* pMsg) { transSendResponse(pMsg); }
int32_t rpcGetConnInfo(void* thandle, SRpcConnInfo* pInfo) { return 0; }
void rpcRefHandle(void* handle, int8_t type) {
assert(type == TAOS_CONN_SERVER || type == TAOS_CONN_CLIENT);
(*taosRefHandle[type])(handle);

View File

@ -47,6 +47,7 @@ typedef struct SCliMsg {
queue q;
STransMsgType type;
int64_t refId;
uint64_t st;
int sent; //(0: no send, 1: alread sent)
} SCliMsg;
@ -505,13 +506,13 @@ static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port) {
}
static void allocConnRef(SCliConn* conn, bool update) {
if (update) {
transRemoveExHandle(conn->refId);
transRemoveExHandle(transGetRefMgt(), conn->refId);
conn->refId = -1;
}
SExHandle* exh = taosMemoryCalloc(1, sizeof(SExHandle));
exh->handle = conn;
exh->pThrd = conn->hostThrd;
exh->refId = transAddExHandle(exh);
exh->refId = transAddExHandle(transGetRefMgt(), exh);
conn->refId = exh->refId;
}
static void addConnToPool(void* pool, SCliConn* conn) {
@ -604,16 +605,14 @@ static void cliDestroyConn(SCliConn* conn, bool clear) {
tTrace("%s conn %p remove from conn pool", CONN_GET_INST_LABEL(conn), conn);
QUEUE_REMOVE(&conn->conn);
QUEUE_INIT(&conn->conn);
transRemoveExHandle(conn->refId);
transRemoveExHandle(transGetRefMgt(), conn->refId);
conn->refId = -1;
if (clear) {
if (!uv_is_closing((uv_handle_t*)conn->stream)) {
uv_read_stop(conn->stream);
uv_close((uv_handle_t*)conn->stream, cliDestroy);
}
//} else {
// cliDestroy((uv_handle_t*)conn->stream);
//}
}
}
static void cliDestroy(uv_handle_t* handle) {
@ -622,7 +621,7 @@ static void cliDestroy(uv_handle_t* handle) {
}
SCliConn* conn = handle->data;
transRemoveExHandle(conn->refId);
transRemoveExHandle(transGetRefMgt(), conn->refId);
taosMemoryFree(conn->ip);
conn->stream->data = NULL;
taosMemoryFree(conn->stream);
@ -638,7 +637,6 @@ static bool cliHandleNoResp(SCliConn* conn) {
SCliMsg* pMsg = transQueueGet(&conn->cliMsgs, 0);
if (REQUEST_NO_RESP(&pMsg->msg)) {
transQueuePop(&conn->cliMsgs);
// taosArrayRemove(msgs, 0);
destroyCmsg(pMsg);
res = true;
}
@ -749,7 +747,7 @@ static void cliHandleQuit(SCliMsg* pMsg, SCliThrd* pThrd) {
}
static void cliHandleRelease(SCliMsg* pMsg, SCliThrd* pThrd) {
int64_t refId = (int64_t)(pMsg->msg.info.handle);
SExHandle* exh = transAcquireExHandle(refId);
SExHandle* exh = transAcquireExHandle(transGetRefMgt(), refId);
if (exh == NULL) {
tDebug("%" PRId64 " already release", refId);
}
@ -775,14 +773,14 @@ SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrd* pThrd, bool* ignore) {
SCliConn* conn = NULL;
int64_t refId = (int64_t)(pMsg->msg.info.handle);
if (refId != 0) {
SExHandle* exh = transAcquireExHandle(refId);
SExHandle* exh = transAcquireExHandle(transGetRefMgt(), refId);
if (exh == NULL) {
*ignore = true;
destroyCmsg(pMsg);
return NULL;
} else {
conn = exh->handle;
transReleaseExHandle(refId);
transReleaseExHandle(transGetRefMgt(), refId);
}
return conn;
};
@ -984,6 +982,7 @@ void cliSendQuit(SCliThrd* thrd) {
}
void cliWalkCb(uv_handle_t* handle, void* arg) {
if (!uv_is_closing(handle)) {
uv_read_stop((uv_stream_t*)handle);
uv_close(handle, cliDestroy);
}
}
@ -1073,7 +1072,7 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
if (retry) {
pMsg->sent = 0;
pCtx->retryCnt += 1;
if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_RPC_BROKEN_LINK) {
cliCompareAndSwap(&pCtx->retryLimit, TRANS_RETRY_COUNT_LIMIT, EPSET_GET_SIZE(&pCtx->epSet) * 3);
if (pCtx->retryCnt < pCtx->retryLimit) {
transUnrefCliHandle(pConn);
@ -1161,12 +1160,12 @@ void transUnrefCliHandle(void* handle) {
}
SCliThrd* transGetWorkThrdFromHandle(int64_t handle) {
SCliThrd* pThrd = NULL;
SExHandle* exh = transAcquireExHandle(handle);
SExHandle* exh = transAcquireExHandle(transGetRefMgt(), handle);
if (exh == NULL) {
return NULL;
}
pThrd = exh->pThrd;
transReleaseExHandle(handle);
transReleaseExHandle(transGetRefMgt(), handle);
return pThrd;
}
SCliThrd* transGetWorkThrd(STrans* trans, int64_t handle) {
@ -1193,10 +1192,13 @@ void transReleaseCliHandle(void* handle) {
}
void transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransCtx* ctx) {
STrans* pTransInst = (STrans*)shandle;
STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle);
if (pTransInst == NULL) return;
SCliThrd* pThrd = transGetWorkThrd(pTransInst, (int64_t)pReq->info.handle);
if (pThrd == NULL) {
transFreeMsg(pReq->pCont);
transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
return;
}
@ -1217,19 +1219,24 @@ void transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STra
cliMsg->msg = *pReq;
cliMsg->st = taosGetTimestampUs();
cliMsg->type = Normal;
cliMsg->refId = (int64_t)shandle;
STraceId* trace = &pReq->info.traceId;
tGTrace("%s send request at thread:%08" PRId64 ", dst: %s:%d, app:%p", transLabel(pTransInst), pThrd->pid,
EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle);
ASSERT(transAsyncSend(pThrd->asyncPool, &(cliMsg->q)) == 0);
transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
return;
}
void transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMsg* pRsp) {
STrans* pTransInst = (STrans*)shandle;
STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle);
if (pTransInst == NULL) return;
SCliThrd* pThrd = transGetWorkThrd(pTransInst, (int64_t)pReq->info.handle);
if (pThrd == NULL) {
transFreeMsg(pReq->pCont);
transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
return;
}
tsem_t* sem = taosMemoryCalloc(1, sizeof(tsem_t));
@ -1250,6 +1257,7 @@ void transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransM
cliMsg->msg = *pReq;
cliMsg->st = taosGetTimestampUs();
cliMsg->type = Normal;
cliMsg->refId = (int64_t)shandle;
STraceId* trace = &pReq->info.traceId;
tGTrace("%s send request at thread:%08" PRId64 ", dst: %s:%d, app:%p", transLabel(pTransInst), pThrd->pid,
@ -1259,13 +1267,16 @@ void transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransM
tsem_wait(sem);
tsem_destroy(sem);
taosMemoryFree(sem);
transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
return;
}
/*
*
**/
void transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn) {
STrans* pTransInst = shandle;
STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle);
if (pTransInst == NULL) return;
SCvtAddr cvtAddr = {0};
if (ip != NULL && fqdn != NULL) {
@ -1280,11 +1291,13 @@ void transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn) {
SCliMsg* cliMsg = taosMemoryCalloc(1, sizeof(SCliMsg));
cliMsg->ctx = pCtx;
cliMsg->type = Update;
cliMsg->refId = (int64_t)shandle;
SCliThrd* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[i];
tDebug("%s update epset at thread:%08" PRId64 "", pTransInst->label, thrd->pid);
transAsyncSend(thrd->asyncPool, &(cliMsg->q));
}
transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
}
#endif

View File

@ -19,6 +19,7 @@
static TdThreadOnce transModuleInit = PTHREAD_ONCE_INIT;
static int32_t refMgt;
static int32_t instMgt;
int transAuthenticateMsg(void* pMsg, int msgLen, void* pAuth, void* pKey) {
T_MD5_CTX context;
@ -479,53 +480,54 @@ bool transEpSetIsEqual(SEpSet* a, SEpSet* b) {
}
return true;
}
static int32_t transGetRefMgt() {
//
return refMgt;
}
static void transInitEnv() {
refMgt = transOpenExHandleMgt(50000);
refMgt = transOpenRefMgt(50000, transDestoryExHandle);
instMgt = taosOpenRef(50, rpcCloseImpl);
uv_os_setenv("UV_TCP_SINGLE_ACCEPT", "1");
}
static void transDestroyEnv() {
// close ref
transCloseExHandleMgt();
transCloseRefMgt(refMgt);
transCloseRefMgt(instMgt);
}
void transInit() {
// init env
taosThreadOnce(&transModuleInit, transInitEnv);
}
int32_t transGetRefMgt() { return refMgt; }
int32_t transGetInstMgt() { return instMgt; }
void transCleanup() {
// clean env
transDestroyEnv();
}
int32_t transOpenExHandleMgt(int size) {
int32_t transOpenRefMgt(int size, void (*func)(void*)) {
// added into once later
return taosOpenRef(size, transDestoryExHandle);
return taosOpenRef(size, func);
}
void transCloseExHandleMgt() {
void transCloseRefMgt(int32_t mgt) {
// close ref
taosCloseRef(transGetRefMgt());
taosCloseRef(mgt);
}
int64_t transAddExHandle(void* p) {
int64_t transAddExHandle(int32_t refMgt, void* p) {
// acquire extern handle
return taosAddRef(transGetRefMgt(), p);
return taosAddRef(refMgt, p);
}
int32_t transRemoveExHandle(int64_t refId) {
int32_t transRemoveExHandle(int32_t refMgt, int64_t refId) {
// acquire extern handle
return taosRemoveRef(transGetRefMgt(), refId);
return taosRemoveRef(refMgt, refId);
}
SExHandle* transAcquireExHandle(int64_t refId) {
void* transAcquireExHandle(int32_t refMgt, int64_t refId) {
// acquire extern handle
return (SExHandle*)taosAcquireRef(transGetRefMgt(), refId);
return (void*)taosAcquireRef(refMgt, refId);
}
int32_t transReleaseExHandle(int64_t refId) {
int32_t transReleaseExHandle(int32_t refMgt, int64_t refId) {
// release extern handle
return taosReleaseRef(transGetRefMgt(), refId);
return taosReleaseRef(refMgt, refId);
}
void transDestoryExHandle(void* handle) {
if (handle == NULL) {

View File

@ -261,7 +261,7 @@ static void uvHandleReq(SSvrConn* pConn) {
// 2. once send out data, cli conn released to conn pool immediately
// 3. not mixed with persist
transMsg.info.ahandle = (void*)pHead->ahandle;
transMsg.info.handle = (void*)transAcquireExHandle(pConn->refId);
transMsg.info.handle = (void*)transAcquireExHandle(transGetRefMgt(), pConn->refId);
transMsg.info.refId = pConn->refId;
transMsg.info.traceId = pHead->traceId;
@ -279,7 +279,7 @@ static void uvHandleReq(SSvrConn* pConn) {
pConnInfo->clientPort = ntohs(pConn->addr.sin_port);
tstrncpy(pConnInfo->user, pConn->user, sizeof(pConnInfo->user));
transReleaseExHandle(pConn->refId);
transReleaseExHandle(transGetRefMgt(), pConn->refId);
STrans* pTransInst = pConn->pTransInst;
(*pTransInst->cfp)(pTransInst->parent, &transMsg, NULL);
@ -507,15 +507,15 @@ void uvWorkerAsyncCb(uv_async_t* handle) {
SExHandle* exh1 = transMsg.info.handle;
int64_t refId = transMsg.info.refId;
SExHandle* exh2 = transAcquireExHandle(refId);
SExHandle* exh2 = transAcquireExHandle(transGetRefMgt(), refId);
if (exh2 == NULL || exh1 != exh2) {
tTrace("handle except msg %p, ignore it", exh1);
transReleaseExHandle(refId);
transReleaseExHandle(transGetRefMgt(), refId);
destroySmsg(msg);
continue;
}
msg->pConn = exh1->handle;
transReleaseExHandle(refId);
transReleaseExHandle(transGetRefMgt(), refId);
(*transAsyncHandle[msg->type])(msg, pThrd);
}
}
@ -757,8 +757,8 @@ static SSvrConn* createConn(void* hThrd) {
SExHandle* exh = taosMemoryMalloc(sizeof(SExHandle));
exh->handle = pConn;
exh->pThrd = pThrd;
exh->refId = transAddExHandle(exh);
transAcquireExHandle(exh->refId);
exh->refId = transAddExHandle(transGetRefMgt(), exh);
transAcquireExHandle(transGetRefMgt(), exh->refId);
pConn->refId = exh->refId;
transRefSrvHandle(pConn);
@ -789,14 +789,14 @@ static void destroyConnRegArg(SSvrConn* conn) {
}
}
static int reallocConnRef(SSvrConn* conn) {
transReleaseExHandle(conn->refId);
transRemoveExHandle(conn->refId);
transReleaseExHandle(transGetRefMgt(), conn->refId);
transRemoveExHandle(transGetRefMgt(), conn->refId);
// avoid app continue to send msg on invalid handle
SExHandle* exh = taosMemoryMalloc(sizeof(SExHandle));
exh->handle = conn;
exh->pThrd = conn->hostThrd;
exh->refId = transAddExHandle(exh);
transAcquireExHandle(exh->refId);
exh->refId = transAddExHandle(transGetRefMgt(), exh);
transAcquireExHandle(transGetRefMgt(), exh->refId);
conn->refId = exh->refId;
return 0;
@ -808,8 +808,8 @@ static void uvDestroyConn(uv_handle_t* handle) {
}
SWorkThrd* thrd = conn->hostThrd;
transReleaseExHandle(conn->refId);
transRemoveExHandle(conn->refId);
transReleaseExHandle(transGetRefMgt(), conn->refId);
transRemoveExHandle(transGetRefMgt(), conn->refId);
tDebug("%s conn %p destroy", transLabel(thrd->pTransInst), conn);
transQueueDestroy(&conn->srvMsgs);
@ -1046,11 +1046,11 @@ void transReleaseSrvHandle(void* handle) {
tTrace("%s conn %p start to release", transLabel(pThrd->pTransInst), exh->handle);
transAsyncSend(pThrd->asyncPool, &m->q);
transReleaseExHandle(refId);
transReleaseExHandle(transGetRefMgt(), refId);
return;
_return1:
tTrace("handle %p failed to send to release handle", exh);
transReleaseExHandle(refId);
transReleaseExHandle(transGetRefMgt(), refId);
return;
_return2:
tTrace("handle %p failed to send to release handle", exh);
@ -1075,12 +1075,12 @@ void transSendResponse(const STransMsg* msg) {
STraceId* trace = (STraceId*)&msg->info.traceId;
tGTrace("conn %p start to send resp (1/2)", exh->handle);
transAsyncSend(pThrd->asyncPool, &m->q);
transReleaseExHandle(refId);
transReleaseExHandle(transGetRefMgt(), refId);
return;
_return1:
tTrace("handle %p failed to send resp", exh);
rpcFreeCont(msg->pCont);
transReleaseExHandle(refId);
transReleaseExHandle(transGetRefMgt(), refId);
return;
_return2:
tTrace("handle %p failed to send resp", exh);
@ -1104,13 +1104,13 @@ void transRegisterMsg(const STransMsg* msg) {
tTrace("%s conn %p start to register brokenlink callback", transLabel(pThrd->pTransInst), exh->handle);
transAsyncSend(pThrd->asyncPool, &m->q);
transReleaseExHandle(refId);
transReleaseExHandle(transGetRefMgt(), refId);
return;
_return1:
tTrace("handle %p failed to register brokenlink", exh);
rpcFreeCont(msg->pCont);
transReleaseExHandle(refId);
transReleaseExHandle(transGetRefMgt(), refId);
return;
_return2:
tTrace("handle %p failed to register brokenlink", exh);

View File

@ -332,21 +332,25 @@ int32_t walWriteWithSyncInfo(SWal *pWal, int64_t index, tmsg_t msgType, SSyncLog
terrno = TSDB_CODE_WAL_SIZE_LIMIT;
return -1;
}
taosThreadMutexLock(&pWal->mutex);
if (index == pWal->vers.lastVer + 1) {
if (taosArrayGetSize(pWal->fileInfoSet) == 0) {
pWal->vers.firstVer = index;
if (walRoll(pWal) < 0) {
taosThreadMutexUnlock(&pWal->mutex);
return -1;
}
} else {
int64_t passed = walGetSeq() - pWal->lastRollSeq;
if (pWal->cfg.rollPeriod != -1 && pWal->cfg.rollPeriod != 0 && passed > pWal->cfg.rollPeriod) {
if (walRoll(pWal) < 0) {
taosThreadMutexUnlock(&pWal->mutex);
return -1;
}
} else if (pWal->cfg.segSize != -1 && pWal->cfg.segSize != 0 && walGetLastFileSize(pWal) > pWal->cfg.segSize) {
if (walRoll(pWal) < 0) {
taosThreadMutexUnlock(&pWal->mutex);
return -1;
}
}
@ -355,6 +359,7 @@ int32_t walWriteWithSyncInfo(SWal *pWal, int64_t index, tmsg_t msgType, SSyncLog
// reject skip log or rewrite log
// must truncate explicitly first
terrno = TSDB_CODE_WAL_INVALID_VER;
taosThreadMutexUnlock(&pWal->mutex);
return -1;
}
@ -362,8 +367,6 @@ int32_t walWriteWithSyncInfo(SWal *pWal, int64_t index, tmsg_t msgType, SSyncLog
ASSERT(pWal->writeCur >= 0);
taosThreadMutexLock(&pWal->mutex);
if (pWal->pWriteIdxTFile == NULL || pWal->pWriteLogTFile == NULL) {
walSetWrite(pWal);
taosLSeekFile(pWal->pWriteLogTFile, 0, SEEK_END);

View File

@ -177,6 +177,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_DNODE_ALREADY_EXIST, "Dnode already exists"
TAOS_DEFINE_ERROR(TSDB_CODE_MND_DNODE_NOT_EXIST, "Dnode does not exist")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOO_MANY_DNODES, "Too many dnodes")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_NO_ENOUGH_DNODES, "Out of dnodes")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_NO_ENOUGH_MEM_IN_DNODE, "No enough memory in dnode")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_CLUSTER_CFG, "Cluster cfg inconsistent")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_CLUSTER_ID, "Cluster id not match")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_DNODE_CFG, "Invalid dnode cfg")

View File

@ -18,6 +18,7 @@
#include "taoserror.h"
#include "tlog.h"
#include "tsched.h"
#include "tdef.h"
#define tmrFatal(...) \
{ \
@ -110,7 +111,7 @@ typedef struct time_wheel_t {
tmr_obj_t** slots;
} time_wheel_t;
static int32_t tsMaxTmrCtrl = 512;
static int32_t tsMaxTmrCtrl = TSDB_MAX_VNODES_PER_DB + 100;
static TdThreadOnce tmrModuleInit = PTHREAD_ONCE_INIT;
static TdThreadMutex tmrCtrlMutex;

View File

@ -52,6 +52,7 @@ typedef struct {
typedef struct SSP_CB_PARAM {
TAOS *taos;
bool fetch;
bool free;
int32_t *end;
} SSP_CB_PARAM;
@ -156,7 +157,58 @@ void sqCloseQueryCb(void *param, TAOS_RES *pRes, int code) {
}
}
int sqSyncStopQuery(bool fetch) {
void sqKillFetchCb(void *param, TAOS_RES *pRes, int numOfRows) {
SSP_CB_PARAM *qParam = (SSP_CB_PARAM *)param;
taos_kill_query(qParam->taos);
*qParam->end = 1;
}
void sqKillQueryCb(void *param, TAOS_RES *pRes, int code) {
SSP_CB_PARAM *qParam = (SSP_CB_PARAM *)param;
if (code == 0 && pRes) {
if (qParam->fetch) {
taos_fetch_rows_a(pRes, sqKillFetchCb, param);
} else {
taos_kill_query(qParam->taos);
*qParam->end = 1;
}
} else {
sqExit("select", taos_errstr(pRes));
}
}
void sqAsyncFetchCb(void *param, TAOS_RES *pRes, int numOfRows) {
SSP_CB_PARAM *qParam = (SSP_CB_PARAM *)param;
if (numOfRows > 0) {
taos_fetch_rows_a(pRes, sqAsyncFetchCb, param);
} else {
*qParam->end = 1;
if (qParam->free) {
taos_free_result(pRes);
}
}
}
void sqAsyncQueryCb(void *param, TAOS_RES *pRes, int code) {
SSP_CB_PARAM *qParam = (SSP_CB_PARAM *)param;
if (code == 0 && pRes) {
if (qParam->fetch) {
taos_fetch_rows_a(pRes, sqAsyncFetchCb, param);
} else {
if (qParam->free) {
taos_free_result(pRes);
}
*qParam->end = 1;
}
} else {
sqExit("select", taos_errstr(pRes));
}
}
int sqStopSyncQuery(bool fetch) {
CASE_ENTER();
for (int32_t i = 0; i < runTimes; ++i) {
char sql[1024] = {0};
@ -189,7 +241,7 @@ int sqSyncStopQuery(bool fetch) {
CASE_LEAVE();
}
int sqAsyncStopQuery(bool fetch) {
int sqStopAsyncQuery(bool fetch) {
CASE_ENTER();
for (int32_t i = 0; i < runTimes; ++i) {
char sql[1024] = {0};
@ -219,7 +271,7 @@ int sqAsyncStopQuery(bool fetch) {
CASE_LEAVE();
}
int sqSyncFreeQuery(bool fetch) {
int sqFreeSyncQuery(bool fetch) {
CASE_ENTER();
for (int32_t i = 0; i < runTimes; ++i) {
char sql[1024] = {0};
@ -250,7 +302,7 @@ int sqSyncFreeQuery(bool fetch) {
CASE_LEAVE();
}
int sqAsyncFreeQuery(bool fetch) {
int sqFreeAsyncQuery(bool fetch) {
CASE_ENTER();
for (int32_t i = 0; i < runTimes; ++i) {
char sql[1024] = {0};
@ -280,7 +332,7 @@ int sqAsyncFreeQuery(bool fetch) {
CASE_LEAVE();
}
int sqSyncCloseQuery(bool fetch) {
int sqCloseSyncQuery(bool fetch) {
CASE_ENTER();
for (int32_t i = 0; i < runTimes; ++i) {
char sql[1024] = {0};
@ -310,7 +362,7 @@ int sqSyncCloseQuery(bool fetch) {
CASE_LEAVE();
}
int sqAsyncCloseQuery(bool fetch) {
int sqCloseAsyncQuery(bool fetch) {
CASE_ENTER();
for (int32_t i = 0; i < runTimes; ++i) {
char sql[1024] = {0};
@ -360,9 +412,39 @@ void *syncQueryThreadFp(void *arg) {
taos_fetch_row(pRes);
}
taos_free_result(pRes);
if (qParam->free) {
taos_free_result(pRes);
}
}
void *asyncQueryThreadFp(void *arg) {
SSP_CB_PARAM* qParam = (SSP_CB_PARAM*)arg;
char sql[1024] = {0};
int32_t code = 0;
TAOS *taos = taos_connect(hostName, "root", "taosdata", NULL, 0);
if (taos == NULL) sqExit("taos_connect", taos_errstr(NULL));
qParam->taos = taos;
sprintf(sql, "reset query cache");
sqExecSQLE(taos, sql);
sprintf(sql, "use %s", dbName);
sqExecSQLE(taos, sql);
sprintf(sql, "select * from %s", tbName);
int32_t qEnd = 0;
SSP_CB_PARAM param = {0};
param.fetch = qParam->fetch;
param.end = &qEnd;
taos_query_a(taos, sql, sqAsyncQueryCb, &param);
while (0 == qEnd) {
usleep(5000);
}
}
void *closeThreadFp(void *arg) {
SSP_CB_PARAM* qParam = (SSP_CB_PARAM*)arg;
while (true) {
@ -376,7 +458,22 @@ void *closeThreadFp(void *arg) {
}
int sqConSyncCloseQuery(bool fetch) {
void *killThreadFp(void *arg) {
SSP_CB_PARAM* qParam = (SSP_CB_PARAM*)arg;
while (true) {
if (qParam->taos) {
usleep(rand() % 10000);
taos_kill_query(qParam->taos);
break;
}
usleep(1);
}
}
int sqConCloseSyncQuery(bool fetch) {
CASE_ENTER();
pthread_t qid, cid;
for (int32_t i = 0; i < runTimes; ++i) {
@ -391,26 +488,160 @@ int sqConSyncCloseQuery(bool fetch) {
CASE_LEAVE();
}
int sqConCloseAsyncQuery(bool fetch) {
CASE_ENTER();
pthread_t qid, cid;
for (int32_t i = 0; i < runTimes; ++i) {
SSP_CB_PARAM param = {0};
param.fetch = fetch;
pthread_create(&qid, NULL, asyncQueryThreadFp, (void*)&param);
pthread_create(&cid, NULL, closeThreadFp, (void*)&param);
pthread_join(qid, NULL);
pthread_join(cid, NULL);
}
CASE_LEAVE();
}
int sqKillSyncQuery(bool fetch) {
CASE_ENTER();
for (int32_t i = 0; i < runTimes; ++i) {
char sql[1024] = {0};
int32_t code = 0;
TAOS *taos = taos_connect(hostName, "root", "taosdata", NULL, 0);
if (taos == NULL) sqExit("taos_connect", taos_errstr(NULL));
sprintf(sql, "reset query cache");
sqExecSQL(taos, sql);
sprintf(sql, "use %s", dbName);
sqExecSQL(taos, sql);
sprintf(sql, "select * from %s", tbName);
TAOS_RES* pRes = taos_query(taos, sql);
code = taos_errno(pRes);
if (code) {
sqExit("taos_query", taos_errstr(pRes));
}
if (fetch) {
taos_fetch_row(pRes);
}
taos_kill_query(taos);
taos_close(taos);
}
CASE_LEAVE();
}
int sqKillAsyncQuery(bool fetch) {
CASE_ENTER();
for (int32_t i = 0; i < runTimes; ++i) {
char sql[1024] = {0};
int32_t code = 0;
TAOS *taos = taos_connect(hostName, "root", "taosdata", NULL, 0);
if (taos == NULL) sqExit("taos_connect", taos_errstr(NULL));
sprintf(sql, "reset query cache");
sqExecSQL(taos, sql);
sprintf(sql, "use %s", dbName);
sqExecSQL(taos, sql);
sprintf(sql, "select * from %s", tbName);
int32_t qEnd = 0;
SSP_CB_PARAM param = {0};
param.fetch = fetch;
param.end = &qEnd;
param.taos = taos;
taos_query_a(taos, sql, sqKillQueryCb, &param);
while (0 == qEnd) {
usleep(5000);
}
taos_close(taos);
}
CASE_LEAVE();
}
int sqConKillSyncQuery(bool fetch) {
CASE_ENTER();
pthread_t qid, cid;
for (int32_t i = 0; i < runTimes; ++i) {
SSP_CB_PARAM param = {0};
param.fetch = fetch;
pthread_create(&qid, NULL, syncQueryThreadFp, (void*)&param);
pthread_create(&cid, NULL, killThreadFp, (void*)&param);
pthread_join(qid, NULL);
pthread_join(cid, NULL);
}
CASE_LEAVE();
}
int sqConKillAsyncQuery(bool fetch) {
CASE_ENTER();
pthread_t qid, cid;
for (int32_t i = 0; i < runTimes; ++i) {
SSP_CB_PARAM param = {0};
param.fetch = fetch;
pthread_create(&qid, NULL, asyncQueryThreadFp, (void*)&param);
pthread_create(&cid, NULL, killThreadFp, (void*)&param);
pthread_join(qid, NULL);
pthread_join(cid, NULL);
}
CASE_LEAVE();
}
void sqRunAllCase(void) {
/*
sqSyncStopQuery(false);
sqSyncStopQuery(true);
sqAsyncStopQuery(false);
sqAsyncStopQuery(true);
sqStopSyncQuery(false);
sqStopSyncQuery(true);
sqStopAsyncQuery(false);
sqStopAsyncQuery(true);
sqSyncFreeQuery(false);
sqSyncFreeQuery(true);
sqAsyncFreeQuery(false);
sqAsyncFreeQuery(true);
sqFreeSyncQuery(false);
sqFreeSyncQuery(true);
sqFreeAsyncQuery(false);
sqFreeAsyncQuery(true);
sqSyncCloseQuery(false);
sqSyncCloseQuery(true);
sqAsyncCloseQuery(false);
sqAsyncCloseQuery(true);
*/
sqConSyncCloseQuery(false);
sqConSyncCloseQuery(true);
sqCloseSyncQuery(false);
sqCloseSyncQuery(true);
sqCloseAsyncQuery(false);
sqCloseAsyncQuery(true);
sqConCloseSyncQuery(false);
sqConCloseSyncQuery(true);
sqConCloseAsyncQuery(false);
sqConCloseAsyncQuery(true);
*/
#if 0
sqKillSyncQuery(false);
sqKillSyncQuery(true);
sqKillAsyncQuery(false);
sqKillAsyncQuery(true);
#endif
//sqConKillSyncQuery(false);
sqConKillSyncQuery(true);
#if 0
sqConKillAsyncQuery(false);
sqConKillAsyncQuery(true);
#endif
int32_t l = 5;
while (l) {
printf("%d\n", l--);
sleep(1);
}
}

View File

@ -121,7 +121,7 @@ echo "firstEp ${HOSTNAME}:7100" >> $TAOS_CFG
echo "secondEp ${HOSTNAME}:7200" >> $TAOS_CFG
echo "fqdn ${HOSTNAME}" >> $TAOS_CFG
echo "serverPort ${NODE}" >> $TAOS_CFG
echo "supportVnodes 128" >> $TAOS_CFG
echo "supportVnodes 1024" >> $TAOS_CFG
echo "dataDir $DATA_DIR" >> $TAOS_CFG
echo "logDir $LOG_DIR" >> $TAOS_CFG
echo "debugFlag 0" >> $TAOS_CFG
@ -135,7 +135,7 @@ echo "jniDebugFlag 143" >> $TAOS_CFG
echo "qDebugFlag 143" >> $TAOS_CFG
echo "rpcDebugFlag 143" >> $TAOS_CFG
echo "tmrDebugFlag 131" >> $TAOS_CFG
echo "uDebugFlag 131" >> $TAOS_CFG
echo "uDebugFlag 143" >> $TAOS_CFG
echo "sDebugFlag 143" >> $TAOS_CFG
echo "wDebugFlag 143" >> $TAOS_CFG
echo "numOfLogLines 20000000" >> $TAOS_CFG

View File

@ -14,7 +14,7 @@ while [ -n "$PID" ]; do
echo kill -9 $PID
#pkill -9 taosd
kill -9 $PID
echo "Killing processes locking on port 6030"
echo "Killing taosd processes"
if [ "$OS_TYPE" != "Darwin" ]; then
fuser -k -n tcp 6030
else

View File

@ -7,39 +7,33 @@ system sh/deploy.sh -n dnode4 -i 4
return
system sh/deploy.sh -n dnode1 -i 1
system sh/deploy.sh -n dnode2 -i 2
system sh/deploy.sh -n dnode3 -i 3
system sh/deploy.sh -n dnode4 -i 4
system sh/deploy.sh -n dnode5 -i 5
system sh/deploy.sh -n dnode6 -i 6
system sh/deploy.sh -n dnode7 -i 7
system sh/deploy.sh -n dnode8 -i 8
system sh/deploy.sh -n dnode9 -i 9
system sh/exec.sh -n dnode1 -s start
system sh/exec.sh -n dnode2 -s start
system sh/exec.sh -n dnode3 -s start
system sh/exec.sh -n dnode4 -s start
system sh/exec.sh -n dnode5 -s start
system sh/exec.sh -n dnode6 -s start
system sh/exec.sh -n dnode7 -s start
system sh/exec.sh -n dnode8 -s start
system sh/exec.sh -n dnode9 -s start
sql connect
sleep 2000
$num = 200
$i = 0
while $i < $num
$port = $i + 8000
$i = $i + 1
sql create dnode $hostname port $port
endw
sql create dnode $hostname port 7200
sql create dnode $hostname port 7300
sql create dnode $hostname port 7400
sql create dnode $hostname port 7500
sql create dnode $hostname port 7600
sql create dnode $hostname port 7700
sql create dnode $hostname port 7800
sql create dnode $hostname port 7900
$i = 0
while $i < $num
$port = $i + 8000
$i = $i + 1
$name = dnode . $port
system sh/deploy.sh -n $name -i 1
system sh/cfg.sh -n $name -c serverPort -v $port
system sh/exec.sh -n $name -s start
endw
sql show dnodes;
print $data00 $data01
print $data10 $data11
print $data20 $data21
print $data30 $data31
print $data40 $data41
return
sql create database db vgroups 1024 buffer 3;
sql use db;
sql create table if not exists stb (ts timestamp, c1 int, c2 float, c3 double) tags (t1 int unsigned);
sql create table ct1 using stb tags(1000);
sql create table ct2 using stb tags(1000) ;
sql show db.tables;
sql insert into ct1 values(now+0s, 10, 2.0, 3.0);

View File

@ -1,8 +1,33 @@
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start
system sh/exec.sh -n dnode1 -s start -v
sql connect
sql create database d0 vgroups 1;
print =============== step1: create drop show dnodes
$x = 0
step1:
$x = $x + 1
sleep 1000
if $x == 10 then
print ====> dnode not ready!
return -1
endi
sql show dnodes
print ===> $data00 $data01 $data02 $data03 $data04 $data05
if $rows != 1 then
return -1
endi
goto _OVER
print =============== step2: create alter drop show user
sql create user u1 pass 'taosdata'
sql show users
sql alter user u1 sysinfo 1
sql alter user u1 enable 1
sql alter user u1 pass 'taosdata'
sql alter user u2 sysinfo 0
sql drop user u1
_OVER:
system sh/exec.sh -n dnode1 -s stop -x SIGINT

@ -1 +1 @@
Subproject commit 7105027650b51e701cfa1dac11b8fb42d447dd01
Subproject commit 1163c0f60aa65d6cc58283247c8bf8c56ba43b92