feature/qnode
This commit is contained in:
parent
aa518a9213
commit
4188934040
|
@ -109,6 +109,8 @@ extern const int32_t TYPE_BYTES[15];
|
|||
#define TSDB_INS_TABLE_USER_USERS "user_users"
|
||||
#define TSDB_INS_TABLE_VGROUPS "vgroups"
|
||||
|
||||
#define TSDB_INS_USER_STABLES_DBNAME_COLID 2
|
||||
|
||||
#define TSDB_TICK_PER_SECOND(precision) \
|
||||
((int64_t)((precision) == TSDB_TIME_PRECISION_MILLI ? 1e3L \
|
||||
: ((precision) == TSDB_TIME_PRECISION_MICRO ? 1e6L : 1e9L)))
|
||||
|
|
|
@ -44,6 +44,30 @@ static void mmProcessQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
|
|||
taosFreeQitem(pMsg);
|
||||
}
|
||||
|
||||
static void mmProcessQueryQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
|
||||
SMnodeMgmt *pMgmt = pInfo->ahandle;
|
||||
|
||||
dTrace("msg:%p, will be processed in mnode queue", pMsg);
|
||||
SRpcMsg *pRpc = &pMsg->rpcMsg;
|
||||
int32_t code = -1;
|
||||
|
||||
pMsg->pNode = pMgmt->pMnode;
|
||||
code = mndProcessMsg(pMsg);
|
||||
|
||||
if (pRpc->msgType & 1U) {
|
||||
if (pRpc->handle == NULL) return;
|
||||
if (code != 0) {
|
||||
SRpcMsg rsp = {.handle = pRpc->handle, .code = code, .ahandle = pRpc->ahandle};
|
||||
dndSendRsp(pMgmt->pWrapper, &rsp);
|
||||
}
|
||||
}
|
||||
|
||||
dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
|
||||
rpcFreeCont(pRpc->pCont);
|
||||
taosFreeQitem(pMsg);
|
||||
}
|
||||
|
||||
|
||||
static int32_t mmPutMsgToWorker(SMnodeMgmt *pMgmt, SSingleWorker *pWorker, SNodeMsg *pMsg) {
|
||||
dTrace("msg:%p, put into worker %s", pMsg, pWorker->name);
|
||||
return taosWriteQitem(pWorker->queue, pMsg);
|
||||
|
@ -102,8 +126,9 @@ int32_t mmPutMsgToQueryQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {
|
|||
|
||||
int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
|
||||
SSingleWorkerCfg cfg = {.minNum = 0, .maxNum = 1, .name = "mnode-read", .fp = (FItem)mmProcessQueue, .param = pMgmt};
|
||||
SSingleWorkerCfg queryCfg = {.minNum = 0, .maxNum = 1, .name = "mnode-query", .fp = (FItem)mmProcessQueryQueue, .param = pMgmt};
|
||||
|
||||
if (tSingleWorkerInit(&pMgmt->queryWorker, &cfg) != 0) {
|
||||
if (tSingleWorkerInit(&pMgmt->queryWorker, &queryCfg) != 0) {
|
||||
dError("failed to start mnode-query worker since %s", terrstr());
|
||||
return -1;
|
||||
}
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
#define SYSTABLE_SCH_TABLE_NAME_LEN ((TSDB_TABLE_NAME_LEN - 1) + VARSTR_HEADER_SIZE)
|
||||
#define SYSTABLE_SCH_DB_NAME_LEN ((TSDB_DB_NAME_LEN - 1) + VARSTR_HEADER_SIZE)
|
||||
|
||||
//!!!! Note: only APPEND columns in below tables, NO insert !!!!
|
||||
static const SInfosTableSchema dnodesSchema[] = {{.name = "id", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT},
|
||||
{.name = "endpoint", .bytes = TSDB_EP_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
|
||||
{.name = "vnodes", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT},
|
||||
|
|
|
@ -5612,6 +5612,7 @@ static SSDataBlock* doSysTableScan(SOperatorInfo *pOperator, bool* newgroup) {
|
|||
|
||||
pInfo->req.type = pInfo->type;
|
||||
strncpy(pInfo->req.tb, tNameGetTableName(&pInfo->name), tListLen(pInfo->req.tb));
|
||||
getFullDBNameFromCondition(pInfo->pCondition, pInfo->req.db));
|
||||
|
||||
int32_t contLen = tSerializeSRetrieveTableReq(NULL, 0, &pInfo->req);
|
||||
char* buf1 = calloc(1, contLen);
|
||||
|
|
|
@ -16,17 +16,11 @@ create1:
|
|||
return -1
|
||||
endi
|
||||
|
||||
# todo remove
|
||||
sql create database useless_db
|
||||
|
||||
sql show dnodes
|
||||
if $data4_2 != ready then
|
||||
goto create1
|
||||
endi
|
||||
|
||||
# todo remove
|
||||
sql drop database useless_db
|
||||
|
||||
print ========== stop dnode2
|
||||
system sh/exec.sh -n dnode2 -s stop -x SIGKILL
|
||||
|
||||
|
@ -103,4 +97,4 @@ if $data03 != 0 then
|
|||
endi
|
||||
|
||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||
system sh/exec.sh -n dnode2 -s stop -x SIGINT
|
||||
system sh/exec.sh -n dnode2 -s stop -x SIGINT
|
||||
|
|
Loading…
Reference in New Issue