Merge branch '2.0' of https://github.com/taosdata/TDengine into 2.0
This commit is contained in:
commit
37b55ef1ac
|
@ -660,7 +660,7 @@ int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows) {
|
|||
}
|
||||
|
||||
// projection query on metric, pipeline retrieve data from vnode list,
|
||||
// instead of two-stage mergevnodeProcessMsgFromShell free qhandle
|
||||
// instead of two-stage mergednodeProcessMsgFromShell free qhandle
|
||||
nRows = taos_fetch_block_impl(res, rows);
|
||||
|
||||
// current subclause is completed, try the next subclause
|
||||
|
|
|
@ -20,6 +20,8 @@
|
|||
extern "C" {
|
||||
#endif
|
||||
|
||||
#include <stdint.h>
|
||||
#include <stdbool.h>
|
||||
#include "tsched.h"
|
||||
#include "dnode.h"
|
||||
|
||||
|
@ -30,6 +32,9 @@ void dnodeDistributeMsgFromMgmt(char *content, int msgLen, int msgType, SMgmtObj
|
|||
|
||||
extern void *dmQhandle;
|
||||
|
||||
void dnodeSendVpeerCfgMsg(int32_t vnode);
|
||||
void dnodeSendMeterCfgMsg(int32_t vnode, int32_t sid);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -20,12 +20,24 @@
|
|||
extern "C" {
|
||||
#endif
|
||||
|
||||
#include <stdbool.h>
|
||||
#include <stdint.h>
|
||||
#include "taosdef.h"
|
||||
#include "taosmsg.h"
|
||||
#include "dnodeShell.h"
|
||||
|
||||
void dnodeFreeQInfoInQueue(SShellObj *pShellObj);
|
||||
|
||||
/*
|
||||
* Dnode handle read messages
|
||||
* The processing result is returned by callback function with pShellObj parameter
|
||||
*/
|
||||
int32_t dnodeReadData(SQueryMeterMsg *msg, void *pShellObj, void (*callback)(SQueryMeterRsp *rspMsg, void *pShellObj));
|
||||
|
||||
typedef void (*SDnodeRetrieveCallbackFp)(int32_t code, SRetrieveMeterRsp *pRetrieveRspMsg, void *pShellObj);
|
||||
|
||||
void dnodeRetrieveData(SRetrieveMeterMsg *pMsg, int32_t msgLen, void *pShellObj, SDnodeRetrieveCallbackFp callback);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -26,7 +26,6 @@ extern "C" {
|
|||
|
||||
typedef struct {
|
||||
int sid;
|
||||
int vnode;
|
||||
uint32_t ip;
|
||||
uint16_t port;
|
||||
int32_t count; // track the number of imports
|
||||
|
@ -38,6 +37,8 @@ typedef struct {
|
|||
|
||||
int32_t dnodeInitShell();
|
||||
|
||||
void dnodeCleanupShell();
|
||||
|
||||
//SDnodeStatisInfo dnodeGetStatisInfo()
|
||||
|
||||
#ifdef __cplusplus
|
||||
|
|
|
@ -37,7 +37,8 @@ extern int32_t (*dnodeInitStorage)();
|
|||
extern void (*dnodeCleanupStorage)();
|
||||
extern void (*dnodeParseParameterK)();
|
||||
extern int32_t tsMaxQueues;
|
||||
|
||||
extern void ** tsRpcQhandle;
|
||||
extern void *tsQueryQhandle;
|
||||
|
||||
int32_t dnodeInitSystem();
|
||||
void dnodeCleanUpSystem();
|
||||
|
|
|
@ -0,0 +1,39 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#ifndef TDENGINE_DNODE_UTIL_H
|
||||
#define TDENGINE_DNODE_UTIL_H
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
#include <stdbool.h>
|
||||
#include <stdint.h>
|
||||
#include "taosdef.h"
|
||||
#include "taosmsg.h"
|
||||
#include "tstatus.h"
|
||||
|
||||
EVnodeStatus dnodeGetVnodeStatus(int32_t vnode);
|
||||
|
||||
bool dnodeCheckVnodeExist(int32_t vnode);
|
||||
|
||||
void *dnodeGetVnodeObj(int32_t vnode);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
||||
#endif
|
|
@ -26,11 +26,13 @@ extern "C" {
|
|||
#include "taosmsg.h"
|
||||
|
||||
/*
|
||||
* Write data based on dnode
|
||||
* If >= 0, it is affect rows
|
||||
* If < 0, get error code from terrno
|
||||
* Write data based on dnode, the detail result can be fetched from rsponse
|
||||
* pSubmitMsg: Data to be written
|
||||
* pShellObj: Used to pass a communication handle
|
||||
* callback: Pass the write result through a callback function, possibly in a different thread space
|
||||
* rsp: will not be freed by callback function
|
||||
*/
|
||||
int32_t dnodeWriteData(SShellSubmitMsg *msg);
|
||||
void dnodeWriteData(SShellSubmitMsg *pMsg, void *pShellObj, void (*callback)(SShellSubmitRspMsg *rsp, void *pShellObj));
|
||||
|
||||
/*
|
||||
* Check if table already exists
|
||||
|
|
|
@ -548,7 +548,7 @@ int vnodeProcessCfgDnodeRequest(char *cont, int contLen, SMgmtObj *pMgmtObj) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
void vnodeSendVpeerCfgMsg(int vnode) {
|
||||
void dnodeSendVpeerCfgMsg(int32_t vnode) {
|
||||
char * pMsg, *pStart;
|
||||
int msgLen;
|
||||
SVpeerCfgMsg *pCfg;
|
||||
|
@ -566,7 +566,7 @@ void vnodeSendVpeerCfgMsg(int vnode) {
|
|||
taosSendMsgToMnode(pObj, pStart, msgLen);
|
||||
}
|
||||
|
||||
int vnodeSendMeterCfgMsg(int vnode, int sid) {
|
||||
void dnodeSendMeterCfgMsg(int32_t vnode, int32_t sid) {
|
||||
char * pMsg, *pStart;
|
||||
int msgLen;
|
||||
SMeterCfgMsg *pCfg;
|
||||
|
|
|
@ -14,4 +14,48 @@
|
|||
*/
|
||||
|
||||
#define _DEFAULT_SOURCE
|
||||
#include "os.h"
|
||||
#include "taoserror.h"
|
||||
#include "tlog.h"
|
||||
#include "dnodeWrite.h"
|
||||
#include "dnode.h"
|
||||
#include "dnodeRead.h"
|
||||
#include "dnodeSystem.h"
|
||||
|
||||
void dnodeFreeQInfoInQueue(SShellObj *pShellObj) {
|
||||
}
|
||||
|
||||
|
||||
void dnodeExecuteRetrieveData(SSchedMsg *pSched) {
|
||||
SRetrieveMeterMsg *pRetrieve = (SRetrieveMeterMsg *)pSched->msg;
|
||||
SDnodeRetrieveCallbackFp callback = (SDnodeRetrieveCallbackFp)pSched->thandle;
|
||||
SShellObj *pObj = (SShellObj *)pSched->ahandle;
|
||||
SRetrieveMeterRsp result = {0};
|
||||
|
||||
/*
|
||||
* in case of server restart, apps may hold qhandle created by server before restart,
|
||||
* which is actually invalid, therefore, signature check is required.
|
||||
*/
|
||||
if (pRetrieve->qhandle != (uint64_t)pObj->qhandle) {
|
||||
// if free flag is set, client wants to clean the resources
|
||||
dError("QInfo:%p, qhandle:%p is not matched with saved:%p", pObj->qhandle, pRetrieve->qhandle, pObj->qhandle);
|
||||
int32_t code = TSDB_CODE_INVALID_QHANDLE;
|
||||
(*callback)(code, &result, pObj);
|
||||
}
|
||||
|
||||
//TODO build response here
|
||||
|
||||
free(pSched->msg);
|
||||
}
|
||||
|
||||
void dnodeRetrieveData(SRetrieveMeterMsg *pMsg, int32_t msgLen, void *pShellObj, SDnodeRetrieveCallbackFp callback) {
|
||||
int8_t *msg = malloc(msgLen);
|
||||
memcpy(msg, pMsg, msgLen);
|
||||
|
||||
SSchedMsg schedMsg;
|
||||
schedMsg.msg = msg;
|
||||
schedMsg.ahandle = pShellObj;
|
||||
schedMsg.thandle = callback;
|
||||
schedMsg.fp = dnodeExecuteRetrieveData;
|
||||
taosScheduleTask(tsQueryQhandle, &schedMsg);
|
||||
}
|
||||
|
|
|
@ -19,37 +19,33 @@
|
|||
#include "taosdef.h"
|
||||
#include "taosmsg.h"
|
||||
#include "tlog.h"
|
||||
#include "tsocket.h"
|
||||
#include "tschemautil.h"
|
||||
#include "textbuffer.h"
|
||||
#include "trpc.h"
|
||||
#include "http.h"
|
||||
#include "dnode.h"
|
||||
#include "dnodeMgmt.h"
|
||||
#include "dnodeRead.h"
|
||||
#include "dnodeSystem.h"
|
||||
#include "dnodeShell.h"
|
||||
#include "dnodeUtil.h"
|
||||
#include "dnodeWrite.h"
|
||||
|
||||
static void dnodeProcessRetrieveRequest(int8_t *pMsg, int32_t msgLen, SShellObj *pObj);
|
||||
static void dnodeProcessQueryRequest(int8_t *pMsg, int32_t msgLen, SShellObj *pObj);
|
||||
static void dnodeProcessShellSubmitRequest(int8_t *pMsg, int32_t msgLen, SShellObj *pObj);
|
||||
|
||||
static void *tsDnodeShellServer = NULL;
|
||||
static SShellObj *tsDnodeShellList = NULL;
|
||||
static int32_t tsDnodeSelectReqNum = 0;
|
||||
static int32_t tsDnodeInsertReqNum = 0;
|
||||
static int32_t tsDnodeShellConns = 0;
|
||||
|
||||
int32_t vnodeProcessRetrieveRequest(char *pMsg, int msgLen, SShellObj *pObj);
|
||||
int32_t vnodeProcessQueryRequest(char *pMsg, int msgLen, SShellObj *pObj);
|
||||
int32_t vnodeProcessShellSubmitRequest(char *pMsg, int msgLen, SShellObj *pObj);
|
||||
#define NUM_OF_SESSIONS_PER_VNODE (300)
|
||||
#define NUM_OF_SESSIONS_PER_DNODE (NUM_OF_SESSIONS_PER_VNODE * TSDB_MAX_VNODES)
|
||||
|
||||
static void vnodeProcessBatchSubmitTimer(void *param, void *tmrId);
|
||||
|
||||
static void *pShellServer = NULL;
|
||||
static SShellObj **shellList = NULL;
|
||||
static int32_t dnodeSelectReqNum = 0;
|
||||
static int32_t dnodeInsertReqNum = 0;
|
||||
|
||||
typedef struct {
|
||||
int32_t import;
|
||||
int32_t vnode;
|
||||
int32_t numOfSid;
|
||||
int32_t ssid; // Start sid
|
||||
SShellObj *pObj;
|
||||
int64_t offset; // offset relative the blks
|
||||
char blks[];
|
||||
} SBatchSubmitInfo;
|
||||
|
||||
void *vnodeProcessMsgFromShell(char *msg, void *ahandle, void *thandle) {
|
||||
void *dnodeProcessMsgFromShell(char *msg, void *ahandle, void *thandle) {
|
||||
int sid, vnode;
|
||||
SShellObj *pObj = (SShellObj *)ahandle;
|
||||
SIntMsg * pMsg = (SIntMsg *)msg;
|
||||
|
@ -61,11 +57,10 @@ void *vnodeProcessMsgFromShell(char *msg, void *ahandle, void *thandle) {
|
|||
if (pObj) {
|
||||
pObj->thandle = NULL;
|
||||
dTrace("QInfo:%p %s free qhandle", pObj->qhandle, __FUNCTION__);
|
||||
vnodeFreeQInfoInQueue(pObj->qhandle);
|
||||
dnodeFreeQInfoInQueue(pObj);
|
||||
pObj->qhandle = NULL;
|
||||
vnodeList[pObj->vnode].shellConns--;
|
||||
dTrace("vid:%d, shell connection:%d is gone, shellConns:%d", pObj->vnode, pObj->sid,
|
||||
vnodeList[pObj->vnode].shellConns);
|
||||
tsDnodeShellConns--;
|
||||
dTrace("shell connection:%d is gone, shellConns:%d", pObj->sid, tsDnodeShellConns);
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
@ -73,53 +68,34 @@ void *vnodeProcessMsgFromShell(char *msg, void *ahandle, void *thandle) {
|
|||
taosGetRpcConnInfo(thandle, &peerId, &peerIp, &peerPort, &vnode, &sid);
|
||||
|
||||
if (pObj == NULL) {
|
||||
if (shellList[vnode]) {
|
||||
pObj = shellList[vnode] + sid;
|
||||
pObj->thandle = thandle;
|
||||
pObj->sid = sid;
|
||||
pObj->vnode = vnode;
|
||||
pObj->ip = peerIp;
|
||||
tinet_ntoa(ipstr, peerIp);
|
||||
vnodeList[pObj->vnode].shellConns++;
|
||||
dTrace("vid:%d, shell connection:%d from ip:%s is created, shellConns:%d", vnode, sid, ipstr,
|
||||
vnodeList[pObj->vnode].shellConns);
|
||||
} else {
|
||||
dError("vid:%d, vnode not there, shell connection shall be closed", vnode);
|
||||
return NULL;
|
||||
}
|
||||
pObj = tsDnodeShellList + sid;
|
||||
pObj->thandle = thandle;
|
||||
pObj->sid = sid;
|
||||
pObj->ip = peerIp;
|
||||
tinet_ntoa(ipstr, peerIp);
|
||||
tsDnodeShellConns--;
|
||||
dTrace("shell connection:%d from ip:%s is created, shellConns:%d", sid, ipstr, tsDnodeShellConns);
|
||||
} else {
|
||||
if (pObj != shellList[vnode] + sid) {
|
||||
dError("vid:%d, shell connection:%d, pObj:%p is not matched with:%p", vnode, sid, pObj, shellList[vnode] + sid);
|
||||
if (pObj != tsDnodeShellList + sid) {
|
||||
dError("shell connection:%d, pObj:%p is not matched with:%p", sid, pObj, tsDnodeShellList + sid);
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
|
||||
dTrace("vid:%d sid:%d, msg:%s is received pConn:%p", vnode, sid, taosMsg[pMsg->msgType], thandle);
|
||||
|
||||
if (dnodeGetRunStatus() != TSDB_DNODE_RUN_STATUS_RUNING) {
|
||||
taosSendSimpleRsp(thandle, pMsg->msgType + 1, TSDB_CODE_NOT_READY);
|
||||
dTrace("sid:%d, shell query msg is ignored since dnode not running", sid);
|
||||
return pObj;
|
||||
}
|
||||
|
||||
if (pMsg->msgType == TSDB_MSG_TYPE_QUERY) {
|
||||
if (vnodeList[vnode].vnodeStatus == TSDB_VN_STATUS_MASTER || vnodeList[vnode].vnodeStatus == TSDB_VN_STATUS_SLAVE) {
|
||||
vnodeProcessQueryRequest((char *)pMsg->content, pMsg->msgLen - sizeof(SIntMsg), pObj);
|
||||
} else {
|
||||
taosSendSimpleRsp(thandle, pMsg->msgType + 1, TSDB_CODE_NOT_READY);
|
||||
dTrace("vid:%d sid:%d, shell query msg is ignored since in status:%s", vnode, sid, taosGetVnodeStatusStr(vnodeList[vnode].vnodeStatus));
|
||||
}
|
||||
dnodeProcessQueryRequest(pMsg->content, pMsg->msgLen - sizeof(SIntMsg), pObj);
|
||||
} else if (pMsg->msgType == TSDB_MSG_TYPE_RETRIEVE) {
|
||||
if (vnodeList[vnode].vnodeStatus == TSDB_VN_STATUS_MASTER || vnodeList[vnode].vnodeStatus == TSDB_VN_STATUS_SLAVE) {
|
||||
vnodeProcessRetrieveRequest((char *) pMsg->content, pMsg->msgLen - sizeof(SIntMsg), pObj);
|
||||
} else {
|
||||
taosSendSimpleRsp(thandle, pMsg->msgType + 1, TSDB_CODE_NOT_READY);
|
||||
dTrace("vid:%d sid:%d, shell retrieve msg is ignored since in status:%s", vnode, sid, taosGetVnodeStatusStr(vnodeList[vnode].vnodeStatus));
|
||||
}
|
||||
dnodeProcessRetrieveRequest(pMsg->content, pMsg->msgLen - sizeof(SIntMsg), pObj);
|
||||
} else if (pMsg->msgType == TSDB_MSG_TYPE_SUBMIT) {
|
||||
if (vnodeList[vnode].vnodeStatus == TSDB_VN_STATUS_MASTER) {
|
||||
vnodeProcessShellSubmitRequest((char *) pMsg->content, pMsg->msgLen - sizeof(SIntMsg), pObj);
|
||||
} else if (vnodeList[vnode].vnodeStatus == TSDB_VN_STATUS_SLAVE) {
|
||||
taosSendSimpleRsp(thandle, pMsg->msgType + 1, TSDB_CODE_REDIRECT);
|
||||
dTrace("vid:%d sid:%d, shell submit msg is redirect since in status:%s", vnode, sid, taosGetVnodeStatusStr(vnodeList[vnode].vnodeStatus));
|
||||
} else {
|
||||
taosSendSimpleRsp(thandle, pMsg->msgType + 1, TSDB_CODE_NOT_READY);
|
||||
dTrace("vid:%d sid:%d, shell submit msg is ignored since in status:%s", vnode, sid, taosGetVnodeStatusStr(vnodeList[vnode].vnodeStatus));
|
||||
}
|
||||
dnodeProcessShellSubmitRequest(pMsg->content, pMsg->msgLen - sizeof(SIntMsg), pObj);
|
||||
} else {
|
||||
dError("%s is not processed", taosMsg[pMsg->msgType]);
|
||||
}
|
||||
|
@ -128,17 +104,13 @@ void *vnodeProcessMsgFromShell(char *msg, void *ahandle, void *thandle) {
|
|||
}
|
||||
|
||||
int32_t dnodeInitShell() {
|
||||
int size;
|
||||
SRpcInit rpcInit;
|
||||
|
||||
size = TSDB_MAX_VNODES * sizeof(SShellObj *);
|
||||
shellList = (SShellObj **)malloc(size);
|
||||
if (shellList == NULL) return -1;
|
||||
memset(shellList, 0, size);
|
||||
|
||||
int numOfThreads = tsNumOfCores * tsNumOfThreadsPerCore;
|
||||
numOfThreads = (1.0 - tsRatioOfQueryThreads) * numOfThreads / 2.0;
|
||||
if (numOfThreads < 1) numOfThreads = 1;
|
||||
if (numOfThreads < 1) {
|
||||
numOfThreads = 1;
|
||||
}
|
||||
|
||||
memset(&rpcInit, 0, sizeof(rpcInit));
|
||||
|
||||
|
@ -147,92 +119,51 @@ int32_t dnodeInitShell() {
|
|||
rpcInit.localPort = tsVnodeShellPort;
|
||||
rpcInit.label = "DND-shell";
|
||||
rpcInit.numOfThreads = numOfThreads;
|
||||
rpcInit.fp = vnodeProcessMsgFromShell;
|
||||
rpcInit.fp = dnodeProcessMsgFromShell;
|
||||
rpcInit.bits = TSDB_SHELL_VNODE_BITS;
|
||||
rpcInit.numOfChanns = TSDB_MAX_VNODES;
|
||||
rpcInit.sessionsPerChann = 16;
|
||||
rpcInit.idMgmt = TAOS_ID_FREE;
|
||||
rpcInit.connType = TAOS_CONN_SOCKET_TYPE_S();
|
||||
rpcInit.idleTime = tsShellActivityTimer * 2000;
|
||||
rpcInit.qhandle = rpcQhandle[0];
|
||||
rpcInit.efp = vnodeSendVpeerCfgMsg;
|
||||
rpcInit.qhandle = tsRpcQhandle[0];
|
||||
//rpcInit.efp = vnodeSendVpeerCfgMsg;
|
||||
|
||||
pShellServer = taosOpenRpc(&rpcInit);
|
||||
if (pShellServer == NULL) {
|
||||
tsDnodeShellServer = taosOpenRpc(&rpcInit);
|
||||
if (tsDnodeShellServer == NULL) {
|
||||
dError("failed to init connection to shell");
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int vnodeOpenShellVnode(int vnode) {
|
||||
if (shellList[vnode] != NULL) {
|
||||
dError("vid:%d, shell is already opened", vnode);
|
||||
const int32_t size = NUM_OF_SESSIONS_PER_DNODE * sizeof(SShellObj);
|
||||
tsDnodeShellList = (SShellObj *)malloc(size);
|
||||
if (tsDnodeShellList == NULL) {
|
||||
dError("failed to allocate shellObj, sessions:%d", NUM_OF_SESSIONS_PER_DNODE);
|
||||
return -1;
|
||||
}
|
||||
memset(tsDnodeShellList, 0, size);
|
||||
|
||||
// TODO re initialize tsRpcQhandle
|
||||
if(taosOpenRpcChannWithQ(tsDnodeShellServer, 0, NUM_OF_SESSIONS_PER_DNODE, tsRpcQhandle) != TSDB_CODE_SUCCESS) {
|
||||
dError("sessions:%d, failed to open shell", NUM_OF_SESSIONS_PER_DNODE);
|
||||
return -1;
|
||||
}
|
||||
|
||||
const int32_t MIN_NUM_OF_SESSIONS = 300;
|
||||
|
||||
SVnodeCfg *pCfg = &vnodeList[vnode].cfg;
|
||||
int32_t sessions = (int32_t) MAX(pCfg->maxSessions * 1.1, MIN_NUM_OF_SESSIONS);
|
||||
|
||||
size_t size = sessions * sizeof(SShellObj);
|
||||
shellList[vnode] = (SShellObj *)calloc(1, size);
|
||||
if (shellList[vnode] == NULL) {
|
||||
dError("vid:%d, sessions:%d, failed to allocate shellObj, size:%d", vnode, pCfg->maxSessions, size);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if(taosOpenRpcChannWithQ(pShellServer, vnode, sessions, rpcQhandle[(vnode+1)%tsMaxQueues]) != TSDB_CODE_SUCCESS) {
|
||||
dError("vid:%d, sessions:%d, failed to open shell", vnode, pCfg->maxSessions);
|
||||
return -1;
|
||||
}
|
||||
|
||||
dPrint("vid:%d, sessions:%d, shell is opened", vnode, pCfg->maxSessions);
|
||||
dError("sessions:%d, shell is opened", NUM_OF_SESSIONS_PER_DNODE);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static void vnodeDelayedFreeResource(void *param, void *tmrId) {
|
||||
int32_t vnode = *(int32_t*) param;
|
||||
dTrace("vid:%d, start to free resources for 500ms arrived", vnode);
|
||||
|
||||
taosCloseRpcChann(pShellServer, vnode); // close connection
|
||||
tfree(shellList[vnode]); //free SShellObj
|
||||
tfree(param);
|
||||
|
||||
memset(vnodeList + vnode, 0, sizeof(SVnodeObj));
|
||||
dTrace("vid:%d, status set to %s", vnode, taosGetVnodeStatusStr(vnodeList[vnode].vnodeStatus));
|
||||
|
||||
vnodeCalcOpenVnodes();
|
||||
}
|
||||
|
||||
void vnodeCloseShellVnode(int vnode) {
|
||||
if (shellList[vnode] == NULL) return;
|
||||
|
||||
for (int i = 0; i < vnodeList[vnode].cfg.maxSessions; ++i) {
|
||||
void* qhandle = shellList[vnode][i].qhandle;
|
||||
if (qhandle != NULL) {
|
||||
vnodeDecRefCount(qhandle);
|
||||
}
|
||||
void dnodeCleanupShell() {
|
||||
if (tsDnodeShellServer) {
|
||||
taosCloseRpc(tsDnodeShellServer);
|
||||
}
|
||||
|
||||
int32_t* v = malloc(sizeof(int32_t));
|
||||
*v = vnode;
|
||||
for (int i = 0; i < NUM_OF_SESSIONS_PER_DNODE; ++i) {
|
||||
dnodeFreeQInfoInQueue(tsDnodeShellList+i);
|
||||
}
|
||||
|
||||
/*
|
||||
* free the connection related resource after 5sec.
|
||||
* 1. The msg, as well as SRpcConn may be in the task queue, free it immediate will cause crash
|
||||
* 2. Free connection may cause *(SRpcConn*)pObj->thandle to be invalid to access.
|
||||
*/
|
||||
dTrace("vid:%d, free resources in 500ms", vnode);
|
||||
taosTmrStart(vnodeDelayedFreeResource, 500, v, vnodeTmrCtrl);
|
||||
}
|
||||
|
||||
void vnodeCleanUpShell() {
|
||||
if (pShellServer) taosCloseRpc(pShellServer);
|
||||
|
||||
tfree(shellList);
|
||||
//tfree(tsDnodeShellList);
|
||||
}
|
||||
|
||||
int vnodeSendQueryRspMsg(SShellObj *pObj, int code, void *qhandle) {
|
||||
|
@ -255,7 +186,7 @@ int vnodeSendQueryRspMsg(SShellObj *pObj, int code, void *qhandle) {
|
|||
return msgLen;
|
||||
}
|
||||
|
||||
int vnodeSendShellSubmitRspMsg(SShellObj *pObj, int code, int numOfPoints) {
|
||||
int32_t dnodeSendShellSubmitRspMsg(SShellObj *pObj, int32_t code, int32_t numOfPoints) {
|
||||
char *pMsg, *pStart;
|
||||
int msgLen;
|
||||
|
||||
|
@ -395,344 +326,73 @@ _query_over:
|
|||
vnodeFreeColumnInfo(&pQueryMsg->colList[i]);
|
||||
}
|
||||
|
||||
atomic_fetch_add_32(&dnodeSelectReqNum, 1);
|
||||
atomic_fetch_add_32(&tsDnodeSelectReqNum, 1);
|
||||
return ret;
|
||||
}
|
||||
|
||||
void vnodeExecuteRetrieveReq(SSchedMsg *pSched) {
|
||||
char * pMsg = pSched->msg;
|
||||
int msgLen;
|
||||
SShellObj *pObj = (SShellObj *)pSched->ahandle;
|
||||
|
||||
SRetrieveMeterMsg *pRetrieve;
|
||||
SRetrieveMeterRsp *pRsp;
|
||||
int numOfRows = 0, rowSize = 0, size = 0;
|
||||
int16_t timePrec = TSDB_TIME_PRECISION_MILLI;
|
||||
}
|
||||
|
||||
char *pStart;
|
||||
void dnodeProcessRetrieveRequestCb(int code, SRetrieveMeterRsp *result, SShellObj *pObj) {
|
||||
if (pObj == NULL || result == NULL || code == TSDB_CODE_ACTION_IN_PROGRESS) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
int code = 0;
|
||||
pRetrieve = (SRetrieveMeterMsg *)pMsg;
|
||||
SQInfo* pQInfo = (SQInfo*)pRetrieve->qhandle;
|
||||
pRetrieve->free = htons(pRetrieve->free);
|
||||
static void dnodeProcessRetrieveRequest(int8_t *pMsg, int32_t msgLen, SShellObj *pObj) {
|
||||
SRetrieveMeterMsg *pRetrieve = (SRetrieveMeterMsg *) pMsg;
|
||||
dnodeRetrieveData(pRetrieve, msgLen, pObj, dnodeProcessRetrieveRequestCb);
|
||||
}
|
||||
|
||||
if ((pRetrieve->free & TSDB_QUERY_TYPE_FREE_RESOURCE) != TSDB_QUERY_TYPE_FREE_RESOURCE) {
|
||||
dTrace("retrieve msg, handle:%p, free:%d", pRetrieve->qhandle, pRetrieve->free);
|
||||
} else {
|
||||
dTrace("retrieve msg to free resource from client, handle:%p, free:%d", pRetrieve->qhandle, pRetrieve->free);
|
||||
void dnodeProcessShellSubmitRequestCb(SShellSubmitRspMsg *result, void *pObj) {
|
||||
if (pObj == NULL || result == NULL || result->code == TSDB_CODE_ACTION_IN_PROGRESS) {
|
||||
return;
|
||||
}
|
||||
|
||||
/*
|
||||
* in case of server restart, apps may hold qhandle created by server before restart,
|
||||
* which is actually invalid, therefore, signature check is required.
|
||||
*/
|
||||
if (pRetrieve->qhandle == (uint64_t)pObj->qhandle) {
|
||||
// if free flag is set, client wants to clean the resources
|
||||
if ((pRetrieve->free & TSDB_QUERY_TYPE_FREE_RESOURCE) != TSDB_QUERY_TYPE_FREE_RESOURCE) {
|
||||
code = vnodeRetrieveQueryInfo((void *)(pRetrieve->qhandle), &numOfRows, &rowSize, &timePrec);
|
||||
SShellObj *pShellObj = (SShellObj *) pObj;
|
||||
int32_t msgLen = sizeof(SShellSubmitRspMsg) + result->numOfFailedBlocks * sizeof(SShellSubmitRspBlock);
|
||||
SShellSubmitRspMsg *submitRsp = (SShellSubmitRspMsg *) taosBuildRspMsgWithSize(pShellObj->thandle,
|
||||
TSDB_MSG_TYPE_SUBMIT_RSP, msgLen);
|
||||
if (submitRsp == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
dTrace("code:%d, numOfRows:%d affectedRows:%d", result->code, result->numOfRows, result->affectedRows);
|
||||
memcpy(submitRsp, result, msgLen);
|
||||
|
||||
for (int i = 0; i < submitRsp->numOfFailedBlocks; ++i) {
|
||||
SShellSubmitRspBlock *block = &submitRsp->failedBlocks[i];
|
||||
if (block->code == TSDB_CODE_NOT_ACTIVE_VNODE || block->code == TSDB_CODE_INVALID_VNODE_ID) {
|
||||
dnodeSendVpeerCfgMsg(block->vnode);
|
||||
} else if (block->code == TSDB_CODE_INVALID_TABLE_ID || block->code == TSDB_CODE_NOT_ACTIVE_TABLE) {
|
||||
dnodeSendMeterCfgMsg(block->vnode, block->sid);
|
||||
}
|
||||
} else {
|
||||
dError("QInfo:%p, qhandle:%p is not matched with saved:%p", pObj->qhandle, pRetrieve->qhandle, pObj->qhandle);
|
||||
code = TSDB_CODE_INVALID_QHANDLE;
|
||||
block->vnode = htonl(block->vnode);
|
||||
block->sid = htonl(block->sid);
|
||||
block->code = htonl(block->code);
|
||||
}
|
||||
submitRsp->code = htonl(submitRsp->code);
|
||||
submitRsp->numOfRows = htonl(submitRsp->numOfRows);
|
||||
submitRsp->affectedRows = htonl(submitRsp->affectedRows);
|
||||
submitRsp->failedRows = htonl(submitRsp->failedRows);
|
||||
submitRsp->numOfFailedBlocks = htonl(submitRsp->numOfFailedBlocks);
|
||||
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
size = vnodeGetResultSize((void *)(pRetrieve->qhandle), &numOfRows);
|
||||
|
||||
// buffer size for progress information, including meter count,
|
||||
// and for each meter, including 'uid' and 'TSKEY'.
|
||||
int progressSize = 0;
|
||||
if (pQInfo->pMeterQuerySupporter != NULL)
|
||||
progressSize = pQInfo->pMeterQuerySupporter->numOfMeters * (sizeof(int64_t) + sizeof(TSKEY)) + sizeof(int32_t);
|
||||
else if (pQInfo->pObj != NULL)
|
||||
progressSize = sizeof(int64_t) + sizeof(TSKEY) + sizeof(int32_t);
|
||||
|
||||
pStart = taosBuildRspMsgWithSize(pObj->thandle, TSDB_MSG_TYPE_RETRIEVE_RSP, progressSize + size + 100);
|
||||
if (pStart == NULL) {
|
||||
taosSendSimpleRsp(pObj->thandle, TSDB_MSG_TYPE_RETRIEVE_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY);
|
||||
goto _exit;
|
||||
}
|
||||
}
|
||||
|
||||
pMsg = pStart;
|
||||
|
||||
*pMsg = code;
|
||||
pMsg++;
|
||||
|
||||
pRsp = (SRetrieveMeterRsp *)pMsg;
|
||||
pRsp->numOfRows = htonl(numOfRows);
|
||||
pRsp->precision = htons(timePrec);
|
||||
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
pRsp->offset = htobe64(vnodeGetOffsetVal((void*)pRetrieve->qhandle));
|
||||
pRsp->useconds = htobe64(((SQInfo *)(pRetrieve->qhandle))->useconds);
|
||||
} else {
|
||||
pRsp->offset = 0;
|
||||
pRsp->useconds = 0;
|
||||
}
|
||||
|
||||
pMsg = pRsp->data;
|
||||
|
||||
if (numOfRows > 0 && code == TSDB_CODE_SUCCESS) {
|
||||
vnodeSaveQueryResult((void *)(pRetrieve->qhandle), pRsp->data, &size);
|
||||
}
|
||||
|
||||
pMsg += size;
|
||||
|
||||
// write the progress information of each meter to response
|
||||
// this is required by subscriptions
|
||||
if (numOfRows > 0 && code == TSDB_CODE_SUCCESS) {
|
||||
if (pQInfo->pMeterQuerySupporter != NULL && pQInfo->pMeterQuerySupporter->pMeterSidExtInfo != NULL) {
|
||||
*((int32_t *)pMsg) = htonl(pQInfo->pMeterQuerySupporter->numOfMeters);
|
||||
pMsg += sizeof(int32_t);
|
||||
for (int32_t i = 0; i < pQInfo->pMeterQuerySupporter->numOfMeters; i++) {
|
||||
*((int64_t *)pMsg) = htobe64(pQInfo->pMeterQuerySupporter->pMeterSidExtInfo[i]->uid);
|
||||
pMsg += sizeof(int64_t);
|
||||
*((TSKEY *)pMsg) = htobe64(pQInfo->pMeterQuerySupporter->pMeterSidExtInfo[i]->key);
|
||||
pMsg += sizeof(TSKEY);
|
||||
}
|
||||
} else if (pQInfo->pObj != NULL) {
|
||||
*((int32_t *)pMsg) = htonl(1);
|
||||
pMsg += sizeof(int32_t);
|
||||
*((int64_t *)pMsg) = htobe64(pQInfo->pObj->uid);
|
||||
pMsg += sizeof(int64_t);
|
||||
if (pQInfo->pointsRead > 0) {
|
||||
*((TSKEY *)pMsg) = htobe64(pQInfo->query.lastKey + 1);
|
||||
} else {
|
||||
*((TSKEY *)pMsg) = htobe64(pQInfo->query.lastKey);
|
||||
}
|
||||
pMsg += sizeof(TSKEY);
|
||||
}
|
||||
}
|
||||
|
||||
msgLen = pMsg - pStart;
|
||||
|
||||
assert(code != TSDB_CODE_ACTION_IN_PROGRESS);
|
||||
|
||||
if (numOfRows == 0 && (pRetrieve->qhandle == (uint64_t)pObj->qhandle) && (code != TSDB_CODE_ACTION_IN_PROGRESS) &&
|
||||
pRetrieve->qhandle != 0) {
|
||||
dTrace("QInfo:%p %s free qhandle code:%d", pObj->qhandle, __FUNCTION__, code);
|
||||
vnodeDecRefCount(pObj->qhandle);
|
||||
pObj->qhandle = NULL;
|
||||
}
|
||||
|
||||
taosSendMsgToPeer(pObj->thandle, pStart, msgLen);
|
||||
|
||||
_exit:
|
||||
free(pSched->msg);
|
||||
taosSendMsgToPeer(pShellObj->thandle, (int8_t*)submitRsp, msgLen);
|
||||
}
|
||||
|
||||
int vnodeProcessRetrieveRequest(char *pMsg, int msgLen, SShellObj *pObj) {
|
||||
SSchedMsg schedMsg;
|
||||
|
||||
char *msg = malloc(msgLen);
|
||||
memcpy(msg, pMsg, msgLen);
|
||||
schedMsg.msg = msg;
|
||||
schedMsg.ahandle = pObj;
|
||||
schedMsg.fp = vnodeExecuteRetrieveReq;
|
||||
taosScheduleTask(queryQhandle, &schedMsg);
|
||||
|
||||
return msgLen;
|
||||
static void dnodeProcessShellSubmitRequest(int8_t *pMsg, int32_t msgLen, SShellObj *pObj) {
|
||||
SShellSubmitMsg *pSubmit = (SShellSubmitMsg *) pMsg;
|
||||
dnodeWriteData(pSubmit, pObj, dnodeProcessShellSubmitRequestCb);
|
||||
atomic_fetch_add_32(&tsDnodeInsertReqNum, 1);
|
||||
}
|
||||
|
||||
static int vnodeCheckSubmitBlockContext(SShellSubmitBlock *pBlocks, SVnodeObj *pVnode) {
|
||||
int32_t sid = htonl(pBlocks->sid);
|
||||
uint64_t uid = htobe64(pBlocks->uid);
|
||||
|
||||
if (sid >= pVnode->cfg.maxSessions || sid <= 0) {
|
||||
dError("vid:%d sid:%d, sid is out of range", pVnode->vnode, sid);
|
||||
return TSDB_CODE_INVALID_TABLE_ID;
|
||||
}
|
||||
|
||||
SMeterObj *pMeterObj = pVnode->meterList[sid];
|
||||
if (pMeterObj == NULL) {
|
||||
dError("vid:%d sid:%d, not active table", pVnode->vnode, sid);
|
||||
vnodeSendMeterCfgMsg(pVnode->vnode, sid);
|
||||
return TSDB_CODE_NOT_ACTIVE_TABLE;
|
||||
}
|
||||
|
||||
if (pMeterObj->uid != uid) {
|
||||
dError("vid:%d sid:%d id:%s, uid:%" PRIu64 ", uid in msg:%" PRIu64 ", uid mismatch", pVnode->vnode, sid, pMeterObj->meterId,
|
||||
pMeterObj->uid, uid);
|
||||
return TSDB_CODE_INVALID_SUBMIT_MSG;
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int vnodeDoSubmitJob(SVnodeObj *pVnode, int import, int32_t *ssid, int32_t esid, SShellSubmitBlock **ppBlocks,
|
||||
TSKEY now, SShellObj *pObj) {
|
||||
SShellSubmitBlock *pBlocks = *ppBlocks;
|
||||
int code = TSDB_CODE_SUCCESS;
|
||||
int32_t numOfPoints = 0;
|
||||
int32_t i = 0;
|
||||
SShellSubmitBlock tBlock;
|
||||
|
||||
for (i = *ssid; i < esid; i++) {
|
||||
numOfPoints = 0;
|
||||
tBlock = *pBlocks;
|
||||
|
||||
code = vnodeCheckSubmitBlockContext(pBlocks, pVnode);
|
||||
if (code != TSDB_CODE_SUCCESS) break;
|
||||
|
||||
SMeterObj *pMeterObj = (SMeterObj *)(pVnode->meterList[htonl(pBlocks->sid)]);
|
||||
|
||||
// dont include sid, vid
|
||||
int32_t subMsgLen = sizeof(pBlocks->numOfRows) + htons(pBlocks->numOfRows) * pMeterObj->bytesPerPoint;
|
||||
int32_t sversion = htonl(pBlocks->sversion);
|
||||
|
||||
if (import) {
|
||||
code = vnodeImportPoints(pMeterObj, (char *)&(pBlocks->numOfRows), subMsgLen, TSDB_DATA_SOURCE_SHELL, pObj,
|
||||
sversion, &numOfPoints, now);
|
||||
pObj->numOfTotalPoints += numOfPoints;
|
||||
|
||||
// records for one table should be consecutive located in the payload buffer, which is guaranteed by client
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
pObj->count--;
|
||||
}
|
||||
} else {
|
||||
code = vnodeInsertPoints(pMeterObj, (char *)&(pBlocks->numOfRows), subMsgLen, TSDB_DATA_SOURCE_SHELL, NULL,
|
||||
sversion, &numOfPoints, now);
|
||||
pObj->numOfTotalPoints += numOfPoints;
|
||||
}
|
||||
|
||||
if (code != TSDB_CODE_SUCCESS) break;
|
||||
|
||||
pBlocks = (SShellSubmitBlock *)((char *)pBlocks + sizeof(SShellSubmitBlock) +
|
||||
htons(pBlocks->numOfRows) * pMeterObj->bytesPerPoint);
|
||||
}
|
||||
|
||||
*ssid = i;
|
||||
*ppBlocks = pBlocks;
|
||||
/* Since the pBlock part can be changed by the vnodeForwardToPeer interface,
|
||||
* which is also possible to be used again. For that case, we just copy the original
|
||||
* block content back.
|
||||
*/
|
||||
if (import && (code == TSDB_CODE_ACTION_IN_PROGRESS)) {
|
||||
memcpy((void *)pBlocks, (void *)&tBlock, sizeof(SShellSubmitBlock));
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
int vnodeProcessShellSubmitRequest(char *pMsg, int msgLen, SShellObj *pObj) {
|
||||
int code = 0, ret = 0;
|
||||
int32_t i = 0;
|
||||
SShellSubmitMsg shellSubmit = *(SShellSubmitMsg *)pMsg;
|
||||
SShellSubmitMsg *pSubmit = &shellSubmit;
|
||||
SShellSubmitBlock *pBlocks = NULL;
|
||||
|
||||
pSubmit->import = htons(pSubmit->import);
|
||||
pSubmit->vnode = htons(pSubmit->vnode);
|
||||
pSubmit->numOfSid = htonl(pSubmit->numOfSid);
|
||||
|
||||
if (pSubmit->numOfSid <= 0) {
|
||||
dError("invalid num of meters:%d", pSubmit->numOfSid);
|
||||
code = TSDB_CODE_INVALID_QUERY_MSG;
|
||||
goto _submit_over;
|
||||
}
|
||||
|
||||
if (pSubmit->vnode >= TSDB_MAX_VNODES || pSubmit->vnode < 0) {
|
||||
dTrace("vnode:%d is out of range", pSubmit->vnode);
|
||||
code = TSDB_CODE_INVALID_VNODE_ID;
|
||||
goto _submit_over;
|
||||
}
|
||||
|
||||
SVnodeObj *pVnode = vnodeList + pSubmit->vnode;
|
||||
if (pVnode->cfg.maxSessions == 0 || pVnode->meterList == NULL) {
|
||||
dError("vid:%d is not activated for submit", pSubmit->vnode);
|
||||
vnodeSendVpeerCfgMsg(pSubmit->vnode);
|
||||
code = TSDB_CODE_NOT_ACTIVE_VNODE;
|
||||
goto _submit_over;
|
||||
}
|
||||
|
||||
if (!(pVnode->accessState & TSDB_VN_WRITE_ACCCESS)) {
|
||||
code = TSDB_CODE_NO_WRITE_ACCESS;
|
||||
goto _submit_over;
|
||||
}
|
||||
|
||||
if (tsAvailDataDirGB < tsMinimalDataDirGB) {
|
||||
dError("server disk space remain %.3f GB, need at least %.3f GB, stop writing", tsAvailDataDirGB, tsMinimalDataDirGB);
|
||||
code = TSDB_CODE_SERV_NO_DISKSPACE;
|
||||
goto _submit_over;
|
||||
}
|
||||
|
||||
pObj->count = pSubmit->numOfSid; // for import
|
||||
pObj->code = 0; // for import
|
||||
pObj->numOfTotalPoints = 0;
|
||||
|
||||
TSKEY now = taosGetTimestamp(pVnode->cfg.precision);
|
||||
|
||||
pBlocks = (SShellSubmitBlock *)(pMsg + sizeof(SShellSubmitMsg));
|
||||
i = 0;
|
||||
code = vnodeDoSubmitJob(pVnode, pSubmit->import, &i, pSubmit->numOfSid, &pBlocks, now, pObj);
|
||||
|
||||
_submit_over:
|
||||
ret = 0;
|
||||
if (pSubmit->import) { // Import case
|
||||
if (code == TSDB_CODE_ACTION_IN_PROGRESS) {
|
||||
|
||||
SBatchSubmitInfo *pSubmitInfo =
|
||||
(SBatchSubmitInfo *)calloc(1, sizeof(SBatchSubmitInfo) + msgLen - sizeof(SShellSubmitMsg));
|
||||
if (pSubmitInfo == NULL) {
|
||||
code = TSDB_CODE_SERV_OUT_OF_MEMORY;
|
||||
ret = vnodeSendShellSubmitRspMsg(pObj, code, pObj->numOfTotalPoints);
|
||||
} else { // Start a timer to process the next part of request
|
||||
pSubmitInfo->import = 1;
|
||||
pSubmitInfo->vnode = pSubmit->vnode;
|
||||
pSubmitInfo->numOfSid = pSubmit->numOfSid;
|
||||
pSubmitInfo->ssid = i; // start from this position, not the initial position
|
||||
pSubmitInfo->pObj = pObj;
|
||||
pSubmitInfo->offset = ((char *)pBlocks) - (pMsg + sizeof(SShellSubmitMsg));
|
||||
assert(pSubmitInfo->offset >= 0);
|
||||
memcpy((void *)(pSubmitInfo->blks), (void *)(pMsg + sizeof(SShellSubmitMsg)), msgLen - sizeof(SShellSubmitMsg));
|
||||
taosTmrStart(vnodeProcessBatchSubmitTimer, 10, (void *)pSubmitInfo, vnodeTmrCtrl);
|
||||
}
|
||||
} else {
|
||||
if (code == TSDB_CODE_SUCCESS) assert(pObj->count == 0);
|
||||
ret = vnodeSendShellSubmitRspMsg(pObj, code, pObj->numOfTotalPoints);
|
||||
}
|
||||
} else { // Insert case
|
||||
ret = vnodeSendShellSubmitRspMsg(pObj, code, pObj->numOfTotalPoints);
|
||||
}
|
||||
|
||||
atomic_fetch_add_32(&dnodeInsertReqNum, 1);
|
||||
return ret;
|
||||
}
|
||||
|
||||
static void vnodeProcessBatchSubmitTimer(void *param, void *tmrId) {
|
||||
SBatchSubmitInfo *pSubmitInfo = (SBatchSubmitInfo *)param;
|
||||
assert(pSubmitInfo != NULL && pSubmitInfo->import);
|
||||
|
||||
int32_t i = 0;
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
||||
SShellObj * pShell = pSubmitInfo->pObj;
|
||||
SVnodeObj * pVnode = &vnodeList[pSubmitInfo->vnode];
|
||||
SShellSubmitBlock *pBlocks = (SShellSubmitBlock *)(pSubmitInfo->blks + pSubmitInfo->offset);
|
||||
TSKEY now = taosGetTimestamp(pVnode->cfg.precision);
|
||||
i = pSubmitInfo->ssid;
|
||||
|
||||
code = vnodeDoSubmitJob(pVnode, pSubmitInfo->import, &i, pSubmitInfo->numOfSid, &pBlocks, now, pShell);
|
||||
|
||||
if (code == TSDB_CODE_ACTION_IN_PROGRESS) {
|
||||
pSubmitInfo->ssid = i;
|
||||
pSubmitInfo->offset = ((char *)pBlocks) - pSubmitInfo->blks;
|
||||
taosTmrStart(vnodeProcessBatchSubmitTimer, 10, (void *)pSubmitInfo, vnodeTmrCtrl);
|
||||
} else {
|
||||
if (code == TSDB_CODE_SUCCESS) assert(pShell->count == 0);
|
||||
tfree(param);
|
||||
vnodeSendShellSubmitRspMsg(pShell, code, pShell->numOfTotalPoints);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
SDnodeStatisInfo dnodeGetStatisInfo() {
|
||||
SDnodeStatisInfo info = {0};
|
||||
if (dnodeGetRunStatus() == TSDB_DNODE_RUN_STATUS_RUNING) {
|
||||
info.httpReqNum = httpGetReqCount();
|
||||
info.selectReqNum = atomic_exchange_32(&dnodeSelectReqNum, 0);
|
||||
info.insertReqNum = atomic_exchange_32(&dnodeInsertReqNum, 0);
|
||||
info.httpReqNum = httpGetReqCount();
|
||||
info.selectReqNum = atomic_exchange_32(&tsDnodeSelectReqNum, 0);
|
||||
info.insertReqNum = atomic_exchange_32(&tsDnodeInsertReqNum, 0);
|
||||
}
|
||||
|
||||
return info;
|
||||
|
|
|
@ -52,9 +52,9 @@ static int32_t dnodeInitTmrCtl();
|
|||
|
||||
void *tsStatusTimer = NULL;
|
||||
void *vnodeTmrCtrl;
|
||||
void **rpcQhandle;
|
||||
void **tsRpcQhandle;
|
||||
void *dmQhandle;
|
||||
void *queryQhandle;
|
||||
void *tsQueryQhandle;
|
||||
int32_t tsVnodePeers = TSDB_VNODES_SUPPORT - 1;
|
||||
int32_t tsMaxQueues;
|
||||
uint32_t tsRebootTime;
|
||||
|
@ -95,6 +95,7 @@ void dnodeCleanUpSystem() {
|
|||
tsStatusTimer = NULL;
|
||||
}
|
||||
|
||||
dnodeCleanupShell();
|
||||
dnodeCleanUpModules();
|
||||
dnodeCleanupVnodes();
|
||||
taosCloseLogger();
|
||||
|
@ -269,7 +270,7 @@ static int32_t dnodeInitQueryQHandle() {
|
|||
int32_t maxQueueSize = tsNumOfVnodesPerCore * tsNumOfCores * tsSessionsPerVnode;
|
||||
dTrace("query task queue initialized, max slot:%d, task threads:%d", maxQueueSize, numOfThreads);
|
||||
|
||||
queryQhandle = taosInitSchedulerWithInfo(maxQueueSize, numOfThreads, "query", vnodeTmrCtrl);
|
||||
tsQueryQhandle = taosInitSchedulerWithInfo(maxQueueSize, numOfThreads, "query", vnodeTmrCtrl);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
@ -291,10 +292,10 @@ static int32_t dnodeInitRpcQHandle() {
|
|||
tsMaxQueues = 1;
|
||||
}
|
||||
|
||||
rpcQhandle = malloc(tsMaxQueues * sizeof(void *));
|
||||
tsRpcQhandle = malloc(tsMaxQueues * sizeof(void *));
|
||||
|
||||
for (int32_t i = 0; i < tsMaxQueues; ++i) {
|
||||
rpcQhandle[i] = taosInitScheduler(tsSessionsPerVnode, 1, "dnode");
|
||||
tsRpcQhandle[i] = taosInitScheduler(tsSessionsPerVnode, 1, "dnode");
|
||||
}
|
||||
|
||||
dmQhandle = taosInitScheduler(tsSessionsPerVnode, 1, "mgmt");
|
||||
|
|
|
@ -0,0 +1,25 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#define _DEFAULT_SOURCE
|
||||
#include "dnodeUtil.h"
|
||||
|
||||
EVnodeStatus dnodeGetVnodeStatus(int32_t vnode) {
|
||||
return TSDB_VN_STATUS_MASTER;
|
||||
}
|
||||
|
||||
bool dnodeCheckVnodeExist(int32_t vnode) {
|
||||
return true;
|
||||
}
|
|
@ -14,15 +14,26 @@
|
|||
*/
|
||||
|
||||
#define _DEFAULT_SOURCE
|
||||
#include "dnodeWrite.h"
|
||||
#include "os.h"
|
||||
#include "taoserror.h"
|
||||
#include "tlog.h"
|
||||
#include "dnodeWrite.h"
|
||||
|
||||
int32_t dnodeCheckTableExist(char *tableId) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t dnodeWriteData(SShellSubmitMsg *msg) {
|
||||
return 0;
|
||||
void dnodeWriteData(SShellSubmitMsg *pSubmit, void *pShellObj, void (*callback)(SShellSubmitRspMsg *, void *)) {
|
||||
SShellSubmitRspMsg result = {0};
|
||||
|
||||
int32_t numOfSid = htonl(pSubmit->numOfSid);
|
||||
if (numOfSid <= 0) {
|
||||
dError("invalid num of tables:%d", numOfSid);
|
||||
result.code = TSDB_CODE_INVALID_QUERY_MSG;
|
||||
callback(&result, pShellObj);
|
||||
}
|
||||
|
||||
//TODO: submit implementation
|
||||
}
|
||||
|
||||
int32_t dnodeCreateNormalTable(SCreateNormalTableMsg *table) {
|
||||
|
|
|
@ -16,6 +16,10 @@
|
|||
#ifndef TDENGINE_HTTP_H
|
||||
#define TDENGINE_HTTP_H
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
#include "tglobalcfg.h"
|
||||
#include "tlog.h"
|
||||
|
||||
|
@ -44,4 +48,8 @@
|
|||
|
||||
int32_t httpGetReqCount();
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
||||
#endif
|
||||
|
|
|
@ -271,6 +271,7 @@ typedef struct {
|
|||
} SSubmitMsg;
|
||||
|
||||
typedef struct {
|
||||
int32_t vnode;
|
||||
int32_t sid;
|
||||
int32_t sversion;
|
||||
uint64_t uid;
|
||||
|
@ -279,12 +280,28 @@ typedef struct {
|
|||
} SShellSubmitBlock;
|
||||
|
||||
typedef struct {
|
||||
short import;
|
||||
short vnode;
|
||||
int8_t import;
|
||||
int8_t reserved[3];
|
||||
int32_t numOfSid; /* total number of sid */
|
||||
char blks[]; /* numOfSid blocks, each blocks for one meter */
|
||||
} SShellSubmitMsg;
|
||||
|
||||
typedef struct {
|
||||
int32_t vnode; // vnode index of failed block
|
||||
int32_t sid; // table index of failed block
|
||||
int32_t code; // errorcode while write data to vnode, such as not created, dropped, no space, invalid table
|
||||
} SShellSubmitRspBlock;
|
||||
|
||||
typedef struct {
|
||||
int32_t code; // 0-success, 1-inprogress, > 1 error code
|
||||
int32_t numOfRows; // number of records the client is trying to write
|
||||
int32_t affectedRows; // number of records actually written
|
||||
int32_t failedRows; // number of failed records (exclude duplicate records)
|
||||
int32_t numOfFailedBlocks;
|
||||
SShellSubmitRspBlock *failedBlocks;
|
||||
} SShellSubmitRspMsg;
|
||||
|
||||
|
||||
typedef struct SSchema {
|
||||
uint8_t type;
|
||||
char name[TSDB_COL_NAME_LEN];
|
||||
|
|
|
@ -40,7 +40,7 @@ void * mgmtStatisticTimer = NULL;
|
|||
int mgmtShellConns = 0;
|
||||
int mgmtDnodeConns = 0;
|
||||
extern void * pShellConn;
|
||||
extern void ** rpcQhandle;
|
||||
extern void ** tsRpcQhandle;
|
||||
extern SMgmtIpList mgmtIpList;
|
||||
extern SMgmtIpList mgmtPublicIpList;
|
||||
extern char mgmtIpStr[TSDB_MAX_MGMT_IPS][20];
|
||||
|
|
|
@ -40,7 +40,7 @@ enum _TSDB_DB_STATUS {
|
|||
TSDB_DB_STATUS_DROP_FROM_SDB
|
||||
};
|
||||
|
||||
enum _TSDB_VN_STATUS {
|
||||
typedef enum _TSDB_VN_STATUS {
|
||||
TSDB_VN_STATUS_OFFLINE,
|
||||
TSDB_VN_STATUS_CREATING,
|
||||
TSDB_VN_STATUS_UNSYNCED,
|
||||
|
@ -48,7 +48,7 @@ enum _TSDB_VN_STATUS {
|
|||
TSDB_VN_STATUS_MASTER,
|
||||
TSDB_VN_STATUS_CLOSING,
|
||||
TSDB_VN_STATUS_DELETING,
|
||||
};
|
||||
} EVnodeStatus;
|
||||
|
||||
enum _TSDB_VN_SYNC_STATUS {
|
||||
TSDB_VN_SYNC_STATUS_INIT,
|
||||
|
|
|
@ -302,9 +302,8 @@ typedef struct {
|
|||
|
||||
// internal globals
|
||||
extern int tsMeterSizeOnFile;
|
||||
extern void ** rpcQhandle;
|
||||
|
||||
extern void * queryQhandle;
|
||||
extern void * tsQueryQhandle;
|
||||
extern int tsVnodePeers;
|
||||
extern int tsMaxVnode;
|
||||
extern int tsMaxQueues;
|
||||
|
|
|
@ -696,7 +696,7 @@ void *vnodeQueryOnSingleTable(SMeterObj **pMetersObj, SSqlGroupbyExpr *pGroupbyE
|
|||
dTrace("QInfo:%p set query flag and prepare runtime environment completed, ref:%d, wait for schedule", pQInfo,
|
||||
pQInfo->refCount);
|
||||
|
||||
taosScheduleTask(queryQhandle, &schedMsg);
|
||||
taosScheduleTask(tsQueryQhandle, &schedMsg);
|
||||
return pQInfo;
|
||||
|
||||
_error:
|
||||
|
@ -812,7 +812,7 @@ void *vnodeQueryOnMultiMeters(SMeterObj **pMetersObj, SSqlGroupbyExpr *pGroupbyE
|
|||
|
||||
dTrace("QInfo:%p set query flag and prepare runtime environment completed, wait for schedule", pQInfo);
|
||||
|
||||
taosScheduleTask(queryQhandle, &schedMsg);
|
||||
taosScheduleTask(tsQueryQhandle, &schedMsg);
|
||||
return pQInfo;
|
||||
|
||||
_error:
|
||||
|
@ -912,7 +912,7 @@ int vnodeSaveQueryResult(void *handle, char *data, int32_t *size) {
|
|||
schedMsg.msg = NULL;
|
||||
schedMsg.thandle = (void *)1;
|
||||
schedMsg.ahandle = pQInfo;
|
||||
taosScheduleTask(queryQhandle, &schedMsg);
|
||||
taosScheduleTask(tsQueryQhandle, &schedMsg);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue