2647 lines
87 KiB
C
2647 lines
87 KiB
C
/*
|
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
|
*
|
|
* This program is free software: you can use, redistribute, and/or modify
|
|
* it under the terms of the GNU Affero General Public License, version 3
|
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
|
*
|
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
|
*
|
|
* You should have received a copy of the GNU Affero General Public License
|
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
*/
|
|
|
|
#include "os.h"
|
|
#include "tcache.h"
|
|
#include "trpc.h"
|
|
#include "tscProfile.h"
|
|
#include "tscSecondaryMerge.h"
|
|
#include "tscSubquery.h"
|
|
#include "tscUtil.h"
|
|
#include "tschemautil.h"
|
|
#include "tsclient.h"
|
|
#include "tsocket.h"
|
|
#include "ttime.h"
|
|
#include "ttimer.h"
|
|
#include "tutil.h"
|
|
|
|
#define TSC_MGMT_VNODE 999
|
|
|
|
SRpcIpSet tscMgmtIpList;
|
|
SRpcIpSet tscDnodeIpSet;
|
|
|
|
int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo) = {0};
|
|
|
|
int (*tscProcessMsgRsp[TSDB_SQL_MAX])(SSqlObj *pSql);
|
|
void tscProcessActivityTimer(void *handle, void *tmrId);
|
|
int tscKeepConn[TSDB_SQL_MAX] = {0};
|
|
TSKEY tscGetSubscriptionProgress(void* sub, int64_t uid);
|
|
void tscUpdateSubscriptionProgress(void* sub, int64_t uid, TSKEY ts);
|
|
void tscSaveSubscriptionProgress(void* sub);
|
|
|
|
static int32_t minMsgSize() { return tsRpcHeadSize + 100; }
|
|
|
|
void tscPrintMgmtIp() {
|
|
if (tscMgmtIpList.numOfIps <= 0) {
|
|
tscError("invalid mgmt IP list:%d", tscMgmtIpList.numOfIps);
|
|
} else {
|
|
for (int i = 0; i < tscMgmtIpList.numOfIps; ++i) {
|
|
tscTrace("mgmt index:%d ip:%d", i, tscMgmtIpList.ip[i]);
|
|
}
|
|
}
|
|
}
|
|
|
|
void tscSetMgmtIpListFromCluster(SRpcIpSet *pIpList) {
|
|
tscMgmtIpList.numOfIps = htons(pIpList->numOfIps);
|
|
tscMgmtIpList.inUse = htons(pIpList->inUse);
|
|
tscMgmtIpList.port = htons(pIpList->port);
|
|
for (int32_t i = 0; i <tscMgmtIpList.numOfIps; ++i) {
|
|
tscMgmtIpList.ip[i] = pIpList->ip[i];
|
|
}
|
|
}
|
|
|
|
void tscSetMgmtIpListFromEdge() {
|
|
if (tscMgmtIpList.numOfIps != 1) {
|
|
tscMgmtIpList.numOfIps = 1;
|
|
tscMgmtIpList.inUse = 0;
|
|
tscMgmtIpList.port = tsMnodeShellPort;
|
|
tscMgmtIpList.ip[0] = inet_addr(tsMasterIp);
|
|
tscTrace("edge mgmt IP list:");
|
|
tscPrintMgmtIp();
|
|
}
|
|
}
|
|
|
|
void tscSetMgmtIpList(SRpcIpSet *pIpList) {
|
|
/*
|
|
* The iplist returned by the cluster edition is the current management nodes
|
|
* and the iplist returned by the edge edition is empty
|
|
*/
|
|
if (pIpList->numOfIps != 0) {
|
|
tscSetMgmtIpListFromCluster(pIpList);
|
|
} else {
|
|
tscSetMgmtIpListFromEdge();
|
|
}
|
|
}
|
|
|
|
/*
|
|
* For each management node, try twice at least in case of poor network situation.
|
|
* If the client start to connect to a non-management node from the client, and the first retry may fail due to
|
|
* the poor network quality. And then, the second retry get the response with redirection command.
|
|
* The retry will not be executed since only *two* retry is allowed in case of single management node in the cluster.
|
|
* Therefore, we need to multiply the retry times by factor of 2 to fix this problem.
|
|
*/
|
|
UNUSED_FUNC
|
|
static int32_t tscGetMgmtConnMaxRetryTimes() {
|
|
int32_t factor = 2;
|
|
return tscMgmtIpList.numOfIps * factor;
|
|
}
|
|
|
|
void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
|
|
STscObj *pObj = (STscObj *)param;
|
|
if (pObj == NULL) return;
|
|
if (pObj != pObj->signature) {
|
|
tscError("heart beat msg, pObj:%p, signature:%p invalid", pObj, pObj->signature);
|
|
return;
|
|
}
|
|
|
|
SSqlObj *pSql = pObj->pHb;
|
|
SSqlRes *pRes = &pSql->res;
|
|
|
|
if (code == 0) {
|
|
SCMHeartBeatRsp *pRsp = (SCMHeartBeatRsp *)pRes->pRsp;
|
|
SRpcIpSet * pIpList = &pRsp->ipList;
|
|
tscSetMgmtIpList(pIpList);
|
|
|
|
if (pRsp->killConnection) {
|
|
tscKillConnection(pObj);
|
|
} else {
|
|
if (pRsp->queryId) tscKillQuery(pObj, htonl(pRsp->queryId));
|
|
if (pRsp->streamId) tscKillStream(pObj, htonl(pRsp->streamId));
|
|
}
|
|
} else {
|
|
tscTrace("heart beat failed, code:%d", code);
|
|
}
|
|
|
|
taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, pObj, tscTmr, &pObj->pTimer);
|
|
}
|
|
|
|
void tscProcessActivityTimer(void *handle, void *tmrId) {
|
|
STscObj *pObj = (STscObj *)handle;
|
|
|
|
if (pObj == NULL) return;
|
|
if (pObj->signature != pObj) return;
|
|
if (pObj->pTimer != tmrId) return;
|
|
|
|
if (pObj->pHb == NULL) {
|
|
SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj));
|
|
if (NULL == pSql) return;
|
|
|
|
pSql->fp = tscProcessHeartBeatRsp;
|
|
|
|
SQueryInfo *pQueryInfo = NULL;
|
|
tscGetQueryInfoDetailSafely(&pSql->cmd, 0, &pQueryInfo);
|
|
pQueryInfo->command = TSDB_SQL_HB;
|
|
|
|
if (TSDB_CODE_SUCCESS != tscAllocPayload(&(pSql->cmd), TSDB_DEFAULT_PAYLOAD_SIZE)) {
|
|
tfree(pSql);
|
|
return;
|
|
}
|
|
|
|
pSql->param = pObj;
|
|
pSql->pTscObj = pObj;
|
|
pSql->signature = pSql;
|
|
pObj->pHb = pSql;
|
|
tscAddSubqueryInfo(&pObj->pHb->cmd);
|
|
|
|
tscTrace("%p pHb is allocated, pObj:%p", pObj->pHb, pObj);
|
|
}
|
|
|
|
if (tscShouldFreeHeatBeat(pObj->pHb)) {
|
|
tscTrace("%p free HB object and release connection", pObj);
|
|
tscFreeSqlObj(pObj->pHb);
|
|
tscCloseTscObj(pObj);
|
|
return;
|
|
}
|
|
|
|
tscProcessSql(pObj->pHb);
|
|
}
|
|
|
|
int tscSendMsgToServer(SSqlObj *pSql) {
|
|
SSqlCmd* pCmd = &pSql->cmd;
|
|
|
|
char *pMsg = rpcMallocCont(pCmd->payloadLen);
|
|
if (NULL == pMsg) {
|
|
tscError("%p msg:%s malloc fail", pSql, taosMsg[pSql->cmd.msgType]);
|
|
return TSDB_CODE_CLI_OUT_OF_MEMORY;
|
|
}
|
|
|
|
if (pSql->cmd.command < TSDB_SQL_MGMT) {
|
|
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
|
|
STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
|
|
|
|
pSql->ipList.numOfIps = pTableMeta->numOfVpeers;
|
|
pSql->ipList.port = tsDnodeShellPort;
|
|
pSql->ipList.inUse = 0;
|
|
|
|
for(int32_t i = 0; i < pTableMeta->numOfVpeers; ++i) {
|
|
pSql->ipList.ip[i] = pTableMeta->vpeerDesc[i].ip;
|
|
}
|
|
|
|
tscPrint("%p msg:%s is sent to server %d", pSql, taosMsg[pSql->cmd.msgType], pSql->ipList.port);
|
|
memcpy(pMsg, pSql->cmd.payload + tsRpcHeadSize, pSql->cmd.payloadLen);
|
|
|
|
SRpcMsg rpcMsg = {
|
|
.msgType = pSql->cmd.msgType,
|
|
.pCont = pMsg,
|
|
.contLen = pSql->cmd.payloadLen,
|
|
.handle = pSql,
|
|
.code = 0
|
|
};
|
|
rpcSendRequest(pVnodeConn, &pSql->ipList, &rpcMsg);
|
|
} else {
|
|
pSql->ipList = tscMgmtIpList;
|
|
pSql->ipList.port = tsMnodeShellPort;
|
|
|
|
tscTrace("%p msg:%s is sent to server %d", pSql, taosMsg[pSql->cmd.msgType], pSql->ipList.port);
|
|
memcpy(pMsg, pSql->cmd.payload, pSql->cmd.payloadLen);
|
|
SRpcMsg rpcMsg = {
|
|
.msgType = pSql->cmd.msgType,
|
|
.pCont = pMsg,
|
|
.contLen = pSql->cmd.payloadLen,
|
|
.handle = pSql,
|
|
.code = 0
|
|
};
|
|
rpcSendRequest(pTscMgmtConn, &pSql->ipList, &rpcMsg);
|
|
}
|
|
|
|
return TSDB_CODE_SUCCESS;
|
|
}
|
|
|
|
void tscProcessMsgFromServer(SRpcMsg *rpcMsg) {
|
|
SSqlObj *pSql = (SSqlObj *)rpcMsg->handle;
|
|
if (pSql == NULL || pSql->signature != pSql) {
|
|
tscError("%p sql is already released, signature:%p", pSql, pSql->signature);
|
|
return;
|
|
}
|
|
|
|
SSqlRes *pRes = &pSql->res;
|
|
SSqlCmd *pCmd = &pSql->cmd;
|
|
STscObj *pObj = pSql->pTscObj;
|
|
tscTrace("%p msg:%p is received from server", pSql, rpcMsg->pCont);
|
|
|
|
if (pSql->freed || pObj->signature != pObj) {
|
|
tscTrace("%p sql is already released or DB connection is closed, freed:%d pObj:%p signature:%p", pSql, pSql->freed,
|
|
pObj, pObj->signature);
|
|
tscFreeSqlObj(pSql);
|
|
rpcFreeCont(rpcMsg->pCont);
|
|
return;
|
|
}
|
|
|
|
if (rpcMsg->pCont == NULL) {
|
|
rpcMsg->code = TSDB_CODE_NETWORK_UNAVAIL;
|
|
} else {
|
|
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
|
|
if (rpcMsg->code == TSDB_CODE_NOT_ACTIVE_TABLE || rpcMsg->code == TSDB_CODE_INVALID_TABLE_ID ||
|
|
rpcMsg->code == TSDB_CODE_INVALID_VNODE_ID || rpcMsg->code == TSDB_CODE_NOT_ACTIVE_VNODE ||
|
|
rpcMsg->code == TSDB_CODE_NETWORK_UNAVAIL || rpcMsg->code == TSDB_CODE_NOT_ACTIVE_TABLE ||
|
|
rpcMsg->code == TSDB_CODE_TABLE_ID_MISMATCH) {
|
|
/*
|
|
* not_active_table: 1. the virtual node may fail to create table, since the procedure of create table is asynchronized,
|
|
* the virtual node may have not create table till now, so try again by using the new metermeta.
|
|
* 2. this requested table may have been removed by other client, so we need to renew the
|
|
* metermeta here.
|
|
*
|
|
* not_active_vnode: current vnode is move to other node due to node balance procedure or virtual node have been
|
|
* removed. So, renew metermeta and try again.
|
|
* not_active_session: db has been move to other node, the vnode does not exist on this dnode anymore.
|
|
*/
|
|
if (pCmd->command == TSDB_SQL_CONNECT) {
|
|
rpcMsg->code = TSDB_CODE_NETWORK_UNAVAIL;
|
|
rpcFreeCont(rpcMsg->pCont);
|
|
return;
|
|
} else if (pCmd->command == TSDB_SQL_HB) {
|
|
rpcMsg->code = TSDB_CODE_NOT_READY;
|
|
rpcFreeCont(rpcMsg->pCont);
|
|
return;
|
|
} else {
|
|
tscTrace("%p it shall renew table meta, code:%d", pSql, tstrerror(rpcMsg->code));
|
|
|
|
pSql->maxRetry = TSDB_VNODES_SUPPORT * 2;
|
|
pSql->res.code = rpcMsg->code; // keep the previous error code
|
|
if (++pSql->retry > pSql->maxRetry) {
|
|
tscError("%p max retry %d reached, ", pSql, pSql->retry);
|
|
return;
|
|
}
|
|
|
|
rpcMsg->code = tscRenewMeterMeta(pSql, pTableMetaInfo->name);
|
|
|
|
if (pTableMetaInfo->pTableMeta) {
|
|
tscSendMsgToServer(pSql);
|
|
rpcFreeCont(rpcMsg->pCont);
|
|
return;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
pSql->retry = 0;
|
|
pRes->rspLen = 0;
|
|
|
|
if (pRes->code != TSDB_CODE_QUERY_CANCELLED) {
|
|
pRes->code = (rpcMsg->code != TSDB_CODE_SUCCESS) ? rpcMsg->code : TSDB_CODE_NETWORK_UNAVAIL;
|
|
} else {
|
|
tscTrace("%p query is cancelled, code:%d", pSql, tstrerror(pRes->code));
|
|
}
|
|
|
|
if (pRes->code != TSDB_CODE_QUERY_CANCELLED) {
|
|
assert(rpcMsg->msgType == pCmd->msgType + 1);
|
|
pRes->code = rpcMsg->code;
|
|
pRes->rspType = rpcMsg->msgType;
|
|
pRes->rspLen = rpcMsg->contLen;
|
|
|
|
if (pRes->rspLen > 0) {
|
|
char *tmp = (char *)realloc(pRes->pRsp, pRes->rspLen);
|
|
if (tmp == NULL) {
|
|
pRes->code = TSDB_CODE_CLI_OUT_OF_MEMORY;
|
|
} else {
|
|
pRes->pRsp = tmp;
|
|
memcpy(pRes->pRsp, rpcMsg->pCont, pRes->rspLen);
|
|
}
|
|
} else {
|
|
pRes->pRsp = NULL;
|
|
}
|
|
|
|
// ignore the error information returned from mnode when set ignore flag in sql
|
|
if (pRes->code == TSDB_CODE_DB_ALREADY_EXIST && pCmd->existsCheck && pRes->rspType == TSDB_MSG_TYPE_CM_CREATE_DB_RSP) {
|
|
pRes->code = TSDB_CODE_SUCCESS;
|
|
}
|
|
|
|
/*
|
|
* There is not response callback function for submit response.
|
|
* The actual inserted number of points is the first number.
|
|
*/
|
|
if (rpcMsg->msgType == TSDB_MSG_TYPE_SUBMIT_RSP) {
|
|
SShellSubmitRspMsg *pMsg = (SShellSubmitRspMsg*)pRes->pRsp;
|
|
pMsg->code = htonl(pMsg->code);
|
|
pMsg->numOfRows = htonl(pMsg->numOfRows);
|
|
pMsg->affectedRows = htonl(pMsg->affectedRows);
|
|
pMsg->failedRows = htonl(pMsg->failedRows);
|
|
pMsg->numOfFailedBlocks = htonl(pMsg->numOfFailedBlocks);
|
|
|
|
pRes->numOfRows += pMsg->affectedRows;
|
|
tscTrace("%p cmd:%d code:%d, inserted rows:%d, rsp len:%d", pSql, pCmd->command, pRes->code,
|
|
pMsg->affectedRows, pRes->rspLen);
|
|
} else {
|
|
tscTrace("%p cmd:%d code:%s rsp len:%d", pSql, pCmd->command, tstrerror(pRes->code), pRes->rspLen);
|
|
}
|
|
}
|
|
|
|
if (pRes->code == TSDB_CODE_SUCCESS && tscProcessMsgRsp[pCmd->command])
|
|
rpcMsg->code = (*tscProcessMsgRsp[pCmd->command])(pSql);
|
|
|
|
if (rpcMsg->code != TSDB_CODE_ACTION_IN_PROGRESS) {
|
|
void *taosres = tscKeepConn[pCmd->command] ? pSql : NULL;
|
|
rpcMsg->code = pRes->code ? pRes->code : pRes->numOfRows;
|
|
|
|
tscTrace("%p Async SQL result:%s res:%p", pSql, tstrerror(pRes->code), pSql);
|
|
|
|
/*
|
|
* Whether to free sqlObj or not should be decided before call the user defined function, since this SqlObj
|
|
* may be freed in UDF, and reused by other threads before tscShouldFreeAsyncSqlObj called, in which case
|
|
* tscShouldFreeAsyncSqlObj checks an object which is actually allocated by other threads.
|
|
*
|
|
* If this block of memory is re-allocated for an insert thread, in which tscKeepConn[command] equals to 0,
|
|
* the tscShouldFreeAsyncSqlObj will success and tscFreeSqlObj free it immediately.
|
|
*/
|
|
bool shouldFree = tscShouldFreeAsyncSqlObj(pSql);
|
|
(*pSql->fp)(pSql->param, taosres, rpcMsg->code);
|
|
|
|
if (shouldFree) {
|
|
tscTrace("%p Async sql is automatically freed", pSql);
|
|
tscFreeSqlObj(pSql);
|
|
}
|
|
}
|
|
|
|
rpcFreeCont(rpcMsg->pCont);
|
|
}
|
|
|
|
int doProcessSql(SSqlObj *pSql) {
|
|
SSqlCmd *pCmd = &pSql->cmd;
|
|
SSqlRes *pRes = &pSql->res;
|
|
|
|
if (pCmd->command == TSDB_SQL_SELECT ||
|
|
pCmd->command == TSDB_SQL_FETCH ||
|
|
pCmd->command == TSDB_SQL_RETRIEVE ||
|
|
pCmd->command == TSDB_SQL_INSERT ||
|
|
pCmd->command == TSDB_SQL_CONNECT ||
|
|
pCmd->command == TSDB_SQL_HB ||
|
|
pCmd->command == TSDB_SQL_META ||
|
|
pCmd->command == TSDB_SQL_STABLEVGROUP) {
|
|
tscBuildMsg[pCmd->command](pSql, NULL);
|
|
}
|
|
|
|
int32_t code = tscSendMsgToServer(pSql);
|
|
if (code != TSDB_CODE_SUCCESS) {
|
|
pRes->code = code;
|
|
tscQueueAsyncRes(pSql);
|
|
}
|
|
|
|
return TSDB_CODE_SUCCESS;
|
|
}
|
|
|
|
int tscProcessSql(SSqlObj *pSql) {
|
|
char * name = NULL;
|
|
SSqlRes *pRes = &pSql->res;
|
|
SSqlCmd *pCmd = &pSql->cmd;
|
|
|
|
SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
|
|
STableMetaInfo *pTableMetaInfo = NULL;
|
|
uint16_t type = 0;
|
|
|
|
if (pQueryInfo != NULL) {
|
|
pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
|
|
if (pTableMetaInfo != NULL) {
|
|
name = pTableMetaInfo->name;
|
|
}
|
|
|
|
type = pQueryInfo->type;
|
|
|
|
// for hearbeat, numOfTables == 0;
|
|
assert((pQueryInfo->numOfTables == 0 && pQueryInfo->command == TSDB_SQL_HB) || pQueryInfo->numOfTables > 0);
|
|
}
|
|
|
|
tscTrace("%p SQL cmd:%d will be processed, name:%s, type:%d", pSql, pCmd->command, name, type);
|
|
if (pSql->cmd.command < TSDB_SQL_MGMT) {
|
|
// the pTableMetaInfo cannot be NULL
|
|
if (pTableMetaInfo == NULL) {
|
|
pSql->res.code = TSDB_CODE_OTHERS;
|
|
return pSql->res.code;
|
|
}
|
|
|
|
// temp
|
|
// pSql->ipList = tscMgmtIpList;
|
|
// if (UTIL_TABLE_IS_NOMRAL_TABLE(pTableMetaInfo)) {
|
|
// pSql->index = pTableMetaInfo->pTableMeta->index;
|
|
// } else { // it must be the parent SSqlObj for super table query
|
|
// if ((pQueryInfo->type & TSDB_QUERY_TYPE_SUBQUERY) != 0) {
|
|
// int32_t idx = pTableMetaInfo->vnodeIndex;
|
|
//
|
|
// SVnodeSidList *pSidList = tscGetVnodeSidList(pTableMetaInfo->pMetricMeta, idx);
|
|
// pSql->index = pSidList->index;
|
|
// }
|
|
// }
|
|
} else if (pSql->cmd.command < TSDB_SQL_LOCAL) {
|
|
pSql->ipList = tscMgmtIpList;
|
|
} else { // local handler
|
|
return (*tscProcessMsgRsp[pCmd->command])(pSql);
|
|
}
|
|
|
|
// todo handle async situation
|
|
if (QUERY_IS_JOIN_QUERY(type)) {
|
|
if ((pQueryInfo->type & TSDB_QUERY_TYPE_SUBQUERY) == 0) {
|
|
return tscHandleMasterJoinQuery(pSql);
|
|
} else {
|
|
// for first stage sub query, iterate all vnodes to get all timestamp
|
|
if ((pQueryInfo->type & TSDB_QUERY_TYPE_JOIN_SEC_STAGE) != TSDB_QUERY_TYPE_JOIN_SEC_STAGE) {
|
|
return doProcessSql(pSql);
|
|
}
|
|
}
|
|
}
|
|
|
|
if (tscIsTwoStageSTableQuery(pQueryInfo, 0)) { // super table query
|
|
tscHandleMasterSTableQuery(pSql);
|
|
return pRes->code;
|
|
} else if (pSql->fp == (void(*)())tscHandleMultivnodeInsert) { // multi-vnodes insertion
|
|
tscHandleMultivnodeInsert(pSql);
|
|
return pSql->res.code;
|
|
}
|
|
|
|
return doProcessSql(pSql);
|
|
}
|
|
|
|
void tscKillMetricQuery(SSqlObj *pSql) {
|
|
SSqlCmd* pCmd = &pSql->cmd;
|
|
|
|
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
|
|
if (!tscIsTwoStageSTableQuery(pQueryInfo, 0)) {
|
|
return;
|
|
}
|
|
|
|
for (int i = 0; i < pSql->numOfSubs; ++i) {
|
|
SSqlObj *pSub = pSql->pSubs[i];
|
|
|
|
if (pSub == NULL) {
|
|
continue;
|
|
}
|
|
|
|
/*
|
|
* here, we cannot set the command = TSDB_SQL_KILL_QUERY. Otherwise, it may cause
|
|
* sub-queries not correctly released and master sql object of metric query reaches an abnormal state.
|
|
*/
|
|
pSql->pSubs[i]->res.code = TSDB_CODE_QUERY_CANCELLED;
|
|
//taosStopRpcConn(pSql->pSubs[i]->thandle);
|
|
}
|
|
|
|
/*
|
|
* 1. if the subqueries are not launched or partially launched, we need to waiting the launched
|
|
* query return to successfully free allocated resources.
|
|
* 2. if no any subqueries are launched yet, which means the metric query only in parse sql stage,
|
|
* set the res.code, and return.
|
|
*/
|
|
const int64_t MAX_WAITING_TIME = 10000; // 10 Sec.
|
|
int64_t stime = taosGetTimestampMs();
|
|
|
|
while (pSql->cmd.command != TSDB_SQL_RETRIEVE_METRIC && pSql->cmd.command != TSDB_SQL_RETRIEVE_EMPTY_RESULT) {
|
|
taosMsleep(100);
|
|
if (taosGetTimestampMs() - stime > MAX_WAITING_TIME) {
|
|
break;
|
|
}
|
|
}
|
|
|
|
tscTrace("%p metric query is cancelled", pSql);
|
|
}
|
|
|
|
int tscBuildRetrieveMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
|
char *pMsg, *pStart;
|
|
|
|
pStart = pSql->cmd.payload + tsRpcHeadSize;
|
|
pMsg = pStart;
|
|
|
|
SRetrieveTableMsg *pRetrieveMsg = (SRetrieveTableMsg *)pMsg;
|
|
pRetrieveMsg->qhandle = htobe64(pSql->res.qhandle);
|
|
pMsg += sizeof(pSql->res.qhandle);
|
|
|
|
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
|
|
pRetrieveMsg->free = htons(pQueryInfo->type);
|
|
pMsg += sizeof(pQueryInfo->type);
|
|
|
|
STableMeta* pTableMeta = pQueryInfo->pTableMetaInfo[0]->pTableMeta;
|
|
pRetrieveMsg->header.vgId = htonl(pTableMeta->vgId);
|
|
pMsg += sizeof(SRetrieveTableMsg);
|
|
|
|
pRetrieveMsg->header.contLen = htonl(pSql->cmd.payloadLen);
|
|
|
|
pSql->cmd.msgType = TSDB_MSG_TYPE_RETRIEVE;
|
|
return TSDB_CODE_SUCCESS;
|
|
}
|
|
|
|
int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
|
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
|
|
STableMeta* pTableMeta = tscGetMetaInfo(pQueryInfo, 0)->pTableMeta;
|
|
|
|
char* pMsg = pSql->cmd.payload + tsRpcHeadSize;
|
|
|
|
// NOTE: shell message size should not include SMsgDesc
|
|
int32_t size = pSql->cmd.payloadLen - sizeof(SMsgDesc);
|
|
|
|
SMsgDesc* pMsgDesc = (SMsgDesc*) pMsg;
|
|
|
|
pMsgDesc->numOfVnodes = htonl(1); //todo set the right number of vnodes
|
|
pMsg += sizeof(SMsgDesc);
|
|
|
|
SSubmitMsg *pShellMsg = (SSubmitMsg *)pMsg;
|
|
|
|
pShellMsg->header.vgId = htonl(pTableMeta->vgId);
|
|
pShellMsg->header.contLen = htonl(size);
|
|
pShellMsg->length = pShellMsg->header.contLen;
|
|
|
|
pShellMsg->numOfBlocks = htonl(pSql->cmd.numOfTablesInSubmit); // number of meters to be inserted
|
|
|
|
// pSql->cmd.payloadLen is set during copying data into payload
|
|
pSql->cmd.msgType = TSDB_MSG_TYPE_SUBMIT;
|
|
tscTrace("%p build submit msg, vgId:%d numOfVnodes:%d", pSql, pTableMeta->vgId, htonl(pMsgDesc->numOfVnodes));
|
|
|
|
return TSDB_CODE_SUCCESS;
|
|
}
|
|
|
|
/*
|
|
* for meter query, simply return the size <= 1k
|
|
* for metric query, estimate size according to meter tags
|
|
*/
|
|
static int32_t tscEstimateQueryMsgSize(SSqlCmd *pCmd, int32_t clauseIndex) {
|
|
const static int32_t MIN_QUERY_MSG_PKT_SIZE = TSDB_MAX_BYTES_PER_ROW * 5;
|
|
SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex);
|
|
|
|
int32_t srcColListSize = pQueryInfo->colList.numOfCols * sizeof(SColumnInfo);
|
|
|
|
int32_t exprSize = sizeof(SSqlFuncExprMsg) * pQueryInfo->exprsInfo.numOfExprs;
|
|
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
|
|
|
|
// meter query without tags values
|
|
if (!UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) {
|
|
return MIN_QUERY_MSG_PKT_SIZE + minMsgSize() + sizeof(SQueryTableMsg) + srcColListSize + exprSize;
|
|
}
|
|
|
|
int32_t size = 4096;
|
|
|
|
#if 0
|
|
SSuperTableMeta *pMetricMeta = pTableMetaInfo->pMetricMeta;
|
|
SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pTableMetaInfo->vnodeIndex);
|
|
|
|
int32_t meterInfoSize = (pMetricMeta->tagLen + sizeof(STableIdInfo)) * pVnodeSidList->numOfSids;
|
|
int32_t outputColumnSize = pQueryInfo->exprsInfo.numOfExprs * sizeof(SSqlFuncExprMsg);
|
|
|
|
int32_t size = meterInfoSize + outputColumnSize + srcColListSize + exprSize + MIN_QUERY_MSG_PKT_SIZE;
|
|
if (pQueryInfo->tsBuf != NULL) {
|
|
size += pQueryInfo->tsBuf->fileSize;
|
|
}
|
|
#endif
|
|
|
|
return size;
|
|
}
|
|
|
|
static char *doSerializeTableInfo(SSqlObj *pSql, int32_t vgId, char *pMsg) {
|
|
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, pSql->cmd.clauseIndex, 0);
|
|
|
|
STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
|
|
tscTrace("%p vgId:%d, query on table:%s, uid:%" PRIu64, pSql, vgId, pTableMetaInfo->name, pTableMeta->uid);
|
|
|
|
STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg;
|
|
pTableIdInfo->sid = htonl(pTableMeta->sid);
|
|
pTableIdInfo->uid = htobe64(pTableMeta->uid);
|
|
pTableIdInfo->key = htobe64(tscGetSubscriptionProgress(pSql->pSubscription, pTableMeta->uid));
|
|
|
|
pMsg += sizeof(STableIdInfo);
|
|
return pMsg;
|
|
}
|
|
|
|
int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
|
SSqlCmd *pCmd = &pSql->cmd;
|
|
|
|
int32_t size = tscEstimateQueryMsgSize(pCmd, pCmd->clauseIndex);
|
|
|
|
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
|
|
tscError("%p failed to malloc for query msg", pSql);
|
|
return -1;
|
|
}
|
|
|
|
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
|
|
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
|
|
STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
|
|
|
|
if (pQueryInfo->colList.numOfCols <= 0) {
|
|
tscError("%p illegal value of numOfCols in query msg: %d", pSql, tscGetNumOfColumns(pTableMeta));
|
|
return -1;
|
|
}
|
|
|
|
if (pQueryInfo->intervalTime < 0) {
|
|
tscError("%p illegal value of aggregation time interval in query msg: %ld", pSql, pQueryInfo->intervalTime);
|
|
return -1;
|
|
}
|
|
|
|
if (pQueryInfo->groupbyExpr.numOfGroupCols < 0) {
|
|
tscError("%p illegal value of numOfGroupCols in query msg: %d", pSql, pQueryInfo->groupbyExpr.numOfGroupCols);
|
|
return -1;
|
|
}
|
|
|
|
char *pStart = pCmd->payload + tsRpcHeadSize;
|
|
|
|
SQueryTableMsg *pQueryMsg = (SQueryTableMsg *)pStart;
|
|
|
|
int32_t msgLen = 0;
|
|
int32_t numOfTables = 0;
|
|
|
|
if (UTIL_TABLE_IS_NOMRAL_TABLE(pTableMetaInfo)) {
|
|
numOfTables = 1;
|
|
pQueryMsg->head.vgId = htonl(pTableMeta->vgId);
|
|
tscTrace("%p queried tables:%d, table id: %s", pSql, 1, pTableMetaInfo->name);
|
|
} else { // query on super table
|
|
if (pTableMetaInfo->vnodeIndex < 0) {
|
|
tscError("%p error vnodeIdx:%d", pSql, pTableMetaInfo->vnodeIndex);
|
|
return -1;
|
|
}
|
|
|
|
uint32_t vnodeId = 1;
|
|
|
|
#if 0
|
|
SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pTableMetaInfo->vnodeIndex);
|
|
uint32_t vnodeId = pVnodeSidList->vpeerDesc[pVnodeSidList->index].vnode;
|
|
|
|
numOfTables = pVnodeSidList->numOfSids;
|
|
if (numOfTables <= 0) {
|
|
tscError("%p vid:%d,error numOfTables in query message:%d", pSql, vnodeId, numOfTables);
|
|
return -1; // error
|
|
}
|
|
#endif
|
|
|
|
tscTrace("%p query on vid:%d, number of tables:%d", pSql, vnodeId, numOfTables);
|
|
pQueryMsg->head.vgId = htonl(vnodeId);
|
|
numOfTables = 1;
|
|
}
|
|
|
|
if (pQueryInfo->order.order == TSQL_SO_ASC) {
|
|
pQueryMsg->window.skey = htobe64(pQueryInfo->stime);
|
|
pQueryMsg->window.ekey = htobe64(pQueryInfo->etime);
|
|
} else {
|
|
pQueryMsg->window.skey = htobe64(pQueryInfo->etime);
|
|
pQueryMsg->window.ekey = htobe64(pQueryInfo->stime);
|
|
}
|
|
|
|
pQueryMsg->numOfTables = htonl(numOfTables);
|
|
pQueryMsg->order = htons(pQueryInfo->order.order);
|
|
pQueryMsg->orderColId = htons(pQueryInfo->order.orderColId);
|
|
pQueryMsg->interpoType = htons(pQueryInfo->interpoType);
|
|
pQueryMsg->limit = htobe64(pQueryInfo->limit.limit);
|
|
pQueryMsg->offset = htobe64(pQueryInfo->limit.offset);
|
|
pQueryMsg->numOfCols = htons(pQueryInfo->colList.numOfCols);
|
|
pQueryMsg->intervalTime = htobe64(pQueryInfo->intervalTime);
|
|
pQueryMsg->slidingTime = htobe64(pQueryInfo->slidingTime);
|
|
pQueryMsg->slidingTimeUnit = pQueryInfo->slidingTimeUnit;
|
|
pQueryMsg->numOfGroupCols = htons(pQueryInfo->groupbyExpr.numOfGroupCols);
|
|
|
|
pQueryMsg->queryType = htons(pQueryInfo->type);
|
|
pQueryMsg->numOfOutputCols = htons(pQueryInfo->exprsInfo.numOfExprs);
|
|
|
|
int32_t numOfOutput = pQueryInfo->fieldsInfo.numOfOutputCols;
|
|
if (numOfOutput < 0) {
|
|
tscError("%p illegal value of number of output columns in query msg: %d", pSql, numOfOutput);
|
|
return -1;
|
|
}
|
|
|
|
// set column list ids
|
|
char *pMsg = (char *)(pQueryMsg->colList) + pQueryInfo->colList.numOfCols * sizeof(SColumnInfo);
|
|
SSchema *pSchema = tscGetTableSchema(pTableMeta);
|
|
|
|
for (int32_t i = 0; i < pQueryInfo->colList.numOfCols; ++i) {
|
|
SColumnBase *pCol = tscColumnBaseInfoGet(&pQueryInfo->colList, i);
|
|
SSchema * pColSchema = &pSchema[pCol->colIndex.columnIndex];
|
|
|
|
// if (pCol->colIndex.columnIndex >= tscGetNumOfColumns(pTableMeta) || pColSchema->type < TSDB_DATA_TYPE_BOOL ||
|
|
// pColSchema->type > TSDB_DATA_TYPE_NCHAR) {
|
|
// tscError("%p vid:%d sid:%d id:%s, column index out of range, numOfColumns:%d, index:%d, column name:%s", pSql,
|
|
// htons(pQueryMsg->vnode), pTableMeta->sid, pTableMetaInfo->name, tscGetNumOfColumns(pTableMeta), pCol->colIndex,
|
|
// pColSchema->name);
|
|
//
|
|
// return -1; // 0 means build msg failed
|
|
// }
|
|
|
|
pQueryMsg->colList[i].colId = htons(pColSchema->colId);
|
|
pQueryMsg->colList[i].bytes = htons(pColSchema->bytes);
|
|
pQueryMsg->colList[i].type = htons(pColSchema->type);
|
|
pQueryMsg->colList[i].numOfFilters = htons(pCol->numOfFilters);
|
|
|
|
// append the filter information after the basic column information
|
|
for (int32_t f = 0; f < pCol->numOfFilters; ++f) {
|
|
SColumnFilterInfo *pColFilter = &pCol->filterInfo[f];
|
|
|
|
SColumnFilterInfo *pFilterMsg = (SColumnFilterInfo *)pMsg;
|
|
pFilterMsg->filterOnBinary = htons(pColFilter->filterOnBinary);
|
|
|
|
pMsg += sizeof(SColumnFilterInfo);
|
|
|
|
if (pColFilter->filterOnBinary) {
|
|
pFilterMsg->len = htobe64(pColFilter->len);
|
|
memcpy(pMsg, (void *)pColFilter->pz, pColFilter->len + 1);
|
|
pMsg += (pColFilter->len + 1); // append the additional filter binary info
|
|
} else {
|
|
pFilterMsg->lowerBndi = htobe64(pColFilter->lowerBndi);
|
|
pFilterMsg->upperBndi = htobe64(pColFilter->upperBndi);
|
|
}
|
|
|
|
pFilterMsg->lowerRelOptr = htons(pColFilter->lowerRelOptr);
|
|
pFilterMsg->upperRelOptr = htons(pColFilter->upperRelOptr);
|
|
|
|
if (pColFilter->lowerRelOptr == TSDB_RELATION_INVALID && pColFilter->upperRelOptr == TSDB_RELATION_INVALID) {
|
|
tscError("invalid filter info");
|
|
return -1;
|
|
}
|
|
}
|
|
}
|
|
|
|
bool hasArithmeticFunction = false;
|
|
|
|
SSqlFuncExprMsg *pSqlFuncExpr = (SSqlFuncExprMsg *)pMsg;
|
|
for (int32_t i = 0; i < tscSqlExprNumOfExprs(pQueryInfo); ++i) {
|
|
SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, i);
|
|
|
|
if (pExpr->functionId == TSDB_FUNC_ARITHM) {
|
|
hasArithmeticFunction = true;
|
|
}
|
|
|
|
if (!tscValidateColumnId(pTableMetaInfo, pExpr->colInfo.colId)) {
|
|
/* column id is not valid according to the cached metermeta, the meter meta is expired */
|
|
tscError("%p table schema is not matched with parsed sql", pSql);
|
|
return -1;
|
|
}
|
|
|
|
pSqlFuncExpr->colInfo.colId = htons(pExpr->colInfo.colId);
|
|
pSqlFuncExpr->colInfo.colIdx = htons(pExpr->colInfo.colIdx);
|
|
pSqlFuncExpr->colInfo.flag = htons(pExpr->colInfo.flag);
|
|
|
|
pSqlFuncExpr->functionId = htons(pExpr->functionId);
|
|
pSqlFuncExpr->numOfParams = htons(pExpr->numOfParams);
|
|
pMsg += sizeof(SSqlFuncExprMsg);
|
|
|
|
for (int32_t j = 0; j < pExpr->numOfParams; ++j) {
|
|
pSqlFuncExpr->arg[j].argType = htons((uint16_t)pExpr->param[j].nType);
|
|
pSqlFuncExpr->arg[j].argBytes = htons(pExpr->param[j].nLen);
|
|
|
|
if (pExpr->param[j].nType == TSDB_DATA_TYPE_BINARY) {
|
|
memcpy(pMsg, pExpr->param[j].pz, pExpr->param[j].nLen);
|
|
|
|
// by plus one char to make the string null-terminated
|
|
pMsg += pExpr->param[j].nLen + 1;
|
|
} else {
|
|
pSqlFuncExpr->arg[j].argValue.i64 = htobe64(pExpr->param[j].i64Key);
|
|
}
|
|
}
|
|
|
|
pSqlFuncExpr = (SSqlFuncExprMsg *)pMsg;
|
|
}
|
|
|
|
int32_t len = 0;
|
|
if (hasArithmeticFunction) {
|
|
SColumnBase *pColBase = pQueryInfo->colList.pColList;
|
|
for (int32_t i = 0; i < pQueryInfo->colList.numOfCols; ++i) {
|
|
char * name = pSchema[pColBase[i].colIndex.columnIndex].name;
|
|
int32_t lenx = strlen(name);
|
|
memcpy(pMsg, name, lenx);
|
|
*(pMsg + lenx) = ',';
|
|
|
|
len += (lenx + 1); // one for comma
|
|
pMsg += (lenx + 1);
|
|
}
|
|
}
|
|
|
|
pQueryMsg->colNameLen = htonl(len);
|
|
|
|
// serialize the table info (sid, uid, tags)
|
|
pMsg = doSerializeTableInfo(pSql, htons(pQueryMsg->head.vgId), pMsg);
|
|
|
|
SSqlGroupbyExpr *pGroupbyExpr = &pQueryInfo->groupbyExpr;
|
|
if (pGroupbyExpr->numOfGroupCols != 0) {
|
|
pQueryMsg->orderByIdx = htons(pGroupbyExpr->orderIndex);
|
|
pQueryMsg->orderType = htons(pGroupbyExpr->orderType);
|
|
|
|
for (int32_t j = 0; j < pGroupbyExpr->numOfGroupCols; ++j) {
|
|
SColIndexEx *pCol = &pGroupbyExpr->columnInfo[j];
|
|
|
|
*((int16_t *)pMsg) = pCol->colId;
|
|
pMsg += sizeof(pCol->colId);
|
|
|
|
*((int16_t *)pMsg) += pCol->colIdx;
|
|
pMsg += sizeof(pCol->colIdx);
|
|
|
|
*((int16_t *)pMsg) += pCol->colIdxInBuf;
|
|
pMsg += sizeof(pCol->colIdxInBuf);
|
|
|
|
*((int16_t *)pMsg) += pCol->flag;
|
|
pMsg += sizeof(pCol->flag);
|
|
|
|
memcpy(pMsg, pCol->name, tListLen(pCol->name));
|
|
pMsg += tListLen(pCol->name);
|
|
}
|
|
}
|
|
|
|
if (pQueryInfo->interpoType != TSDB_INTERPO_NONE) {
|
|
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) {
|
|
*((int64_t *)pMsg) = htobe64(pQueryInfo->defaultVal[i]);
|
|
pMsg += sizeof(pQueryInfo->defaultVal[0]);
|
|
}
|
|
}
|
|
|
|
// compressed ts block
|
|
pQueryMsg->tsOffset = htonl(pMsg - pStart);
|
|
int32_t tsLen = 0;
|
|
int32_t numOfBlocks = 0;
|
|
|
|
if (pQueryInfo->tsBuf != NULL) {
|
|
STSVnodeBlockInfo *pBlockInfo = tsBufGetVnodeBlockInfo(pQueryInfo->tsBuf, pTableMetaInfo->vnodeIndex);
|
|
assert(QUERY_IS_JOIN_QUERY(pQueryInfo->type) && pBlockInfo != NULL); // this query should not be sent
|
|
|
|
// todo refactor
|
|
fseek(pQueryInfo->tsBuf->f, pBlockInfo->offset, SEEK_SET);
|
|
fread(pMsg, pBlockInfo->compLen, 1, pQueryInfo->tsBuf->f);
|
|
|
|
pMsg += pBlockInfo->compLen;
|
|
tsLen = pBlockInfo->compLen;
|
|
numOfBlocks = pBlockInfo->numOfBlocks;
|
|
}
|
|
|
|
pQueryMsg->tsLen = htonl(tsLen);
|
|
pQueryMsg->tsNumOfBlocks = htonl(numOfBlocks);
|
|
if (pQueryInfo->tsBuf != NULL) {
|
|
pQueryMsg->tsOrder = htonl(pQueryInfo->tsBuf->tsOrder);
|
|
}
|
|
|
|
// serialize tag column query condition
|
|
if (pQueryInfo->tagCond.numOfTagCond > 0) {
|
|
STagCond* pTagCond = &pQueryInfo->tagCond;
|
|
|
|
SCond *pCond = tsGetSTableQueryCondPos(pTagCond, pTableMeta->uid);
|
|
if (pCond != NULL && pCond->cond != NULL) {
|
|
size_t condLen = strlen(pCond->cond) + 1;
|
|
|
|
bool ret = taosMbsToUcs4(pCond->cond, condLen, pMsg, condLen * TSDB_NCHAR_SIZE);
|
|
if (!ret) {
|
|
tscError("%p mbs to ucs4 failed:%d", pSql, tsGetSTableQueryCondPos(pTagCond, pTableMeta->uid));
|
|
return 0;
|
|
}
|
|
|
|
pQueryMsg->tagCondLen = htons(condLen);
|
|
pMsg += condLen * TSDB_NCHAR_SIZE;
|
|
}
|
|
}
|
|
|
|
// tbname in/like query expression should be sent to mgmt node
|
|
STagCond* pTagCond = &pQueryInfo->tagCond;
|
|
if (pTagCond->tbnameCond.cond != NULL) {
|
|
size_t s = strlen(pTagCond->tbnameCond.cond);
|
|
memcpy(pMsg, pTagCond->tbnameCond.cond, s);
|
|
|
|
pQueryMsg->nameCondLen = htons(s);
|
|
pMsg += s;
|
|
}
|
|
|
|
msgLen = pMsg - pStart;
|
|
|
|
tscTrace("%p msg built success,len:%d bytes", pSql, msgLen);
|
|
pCmd->payloadLen = msgLen;
|
|
pSql->cmd.msgType = TSDB_MSG_TYPE_QUERY;
|
|
|
|
pQueryMsg->head.contLen = htonl(msgLen);
|
|
assert(msgLen + minMsgSize() <= size);
|
|
|
|
return TSDB_CODE_SUCCESS;
|
|
}
|
|
|
|
int32_t tscBuildCreateDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
|
SSqlCmd *pCmd = &pSql->cmd;
|
|
pCmd->payloadLen = sizeof(SCMCreateDbMsg);
|
|
pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_DB;
|
|
|
|
SCMCreateDbMsg *pCreateDbMsg = (SCMCreateDbMsg*)pCmd->payload;
|
|
|
|
assert(pCmd->numOfClause == 1);
|
|
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
|
|
strncpy(pCreateDbMsg->db, pTableMetaInfo->name, tListLen(pCreateDbMsg->db));
|
|
|
|
return TSDB_CODE_SUCCESS;
|
|
}
|
|
|
|
int32_t tscBuildCreateDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
|
SSqlCmd *pCmd = &pSql->cmd;
|
|
pCmd->payloadLen = sizeof(SCMCreateDnodeMsg);
|
|
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
|
|
tscError("%p failed to malloc for query msg", pSql);
|
|
return TSDB_CODE_CLI_OUT_OF_MEMORY;
|
|
}
|
|
|
|
SCMCreateDnodeMsg *pCreate = (SCMCreateDnodeMsg *)pCmd->payload;
|
|
strncpy(pCreate->ip, pInfo->pDCLInfo->a[0].z, pInfo->pDCLInfo->a[0].n);
|
|
pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_DNODE;
|
|
|
|
return TSDB_CODE_SUCCESS;
|
|
}
|
|
|
|
int32_t tscBuildAcctMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
|
SSqlCmd *pCmd = &pSql->cmd;
|
|
pCmd->payloadLen = sizeof(SCMCreateAcctMsg);
|
|
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
|
|
tscError("%p failed to malloc for query msg", pSql);
|
|
return TSDB_CODE_CLI_OUT_OF_MEMORY;
|
|
}
|
|
|
|
SCMCreateAcctMsg *pAlterMsg = (SCMCreateAcctMsg *)pCmd->payload;
|
|
|
|
SSQLToken *pName = &pInfo->pDCLInfo->user.user;
|
|
SSQLToken *pPwd = &pInfo->pDCLInfo->user.passwd;
|
|
|
|
strncpy(pAlterMsg->user, pName->z, pName->n);
|
|
strncpy(pAlterMsg->pass, pPwd->z, pPwd->n);
|
|
|
|
SCreateAcctSQL *pAcctOpt = &pInfo->pDCLInfo->acctOpt;
|
|
|
|
pAlterMsg->cfg.maxUsers = htonl(pAcctOpt->maxUsers);
|
|
pAlterMsg->cfg.maxDbs = htonl(pAcctOpt->maxDbs);
|
|
pAlterMsg->cfg.maxTimeSeries = htonl(pAcctOpt->maxTimeSeries);
|
|
pAlterMsg->cfg.maxStreams = htonl(pAcctOpt->maxStreams);
|
|
pAlterMsg->cfg.maxPointsPerSecond = htonl(pAcctOpt->maxPointsPerSecond);
|
|
pAlterMsg->cfg.maxStorage = htobe64(pAcctOpt->maxStorage);
|
|
pAlterMsg->cfg.maxQueryTime = htobe64(pAcctOpt->maxQueryTime);
|
|
pAlterMsg->cfg.maxConnections = htonl(pAcctOpt->maxConnections);
|
|
|
|
if (pAcctOpt->stat.n == 0) {
|
|
pAlterMsg->cfg.accessState = -1;
|
|
} else {
|
|
if (pAcctOpt->stat.z[0] == 'r' && pAcctOpt->stat.n == 1) {
|
|
pAlterMsg->cfg.accessState = TSDB_VN_READ_ACCCESS;
|
|
} else if (pAcctOpt->stat.z[0] == 'w' && pAcctOpt->stat.n == 1) {
|
|
pAlterMsg->cfg.accessState = TSDB_VN_WRITE_ACCCESS;
|
|
} else if (strncmp(pAcctOpt->stat.z, "all", 3) == 0 && pAcctOpt->stat.n == 3) {
|
|
pAlterMsg->cfg.accessState = TSDB_VN_ALL_ACCCESS;
|
|
} else if (strncmp(pAcctOpt->stat.z, "no", 2) == 0 && pAcctOpt->stat.n == 2) {
|
|
pAlterMsg->cfg.accessState = 0;
|
|
}
|
|
}
|
|
|
|
pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_ACCT;
|
|
return TSDB_CODE_SUCCESS;
|
|
}
|
|
|
|
int32_t tscBuildUserMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
|
SSqlCmd *pCmd = &pSql->cmd;
|
|
pCmd->payloadLen = sizeof(SCMCreateUserMsg);
|
|
|
|
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
|
|
tscError("%p failed to malloc for query msg", pSql);
|
|
return TSDB_CODE_CLI_OUT_OF_MEMORY;
|
|
}
|
|
|
|
SCMCreateUserMsg *pAlterMsg = (SCMCreateUserMsg*)pCmd->payload;
|
|
|
|
SUserInfo *pUser = &pInfo->pDCLInfo->user;
|
|
strncpy(pAlterMsg->user, pUser->user.z, pUser->user.n);
|
|
pAlterMsg->flag = pUser->type;
|
|
|
|
if (pUser->type == TSDB_ALTER_USER_PRIVILEGES) {
|
|
pAlterMsg->privilege = (char)pCmd->count;
|
|
} else if (pUser->type == TSDB_ALTER_USER_PASSWD) {
|
|
strncpy(pAlterMsg->pass, pUser->passwd.z, pUser->passwd.n);
|
|
} else { // create user password info
|
|
strncpy(pAlterMsg->pass, pUser->passwd.z, pUser->passwd.n);
|
|
}
|
|
|
|
if (pUser->type == TSDB_ALTER_USER_PASSWD || pUser->type == TSDB_ALTER_USER_PRIVILEGES) {
|
|
pCmd->msgType = TSDB_MSG_TYPE_CM_ALTER_USER;
|
|
} else {
|
|
pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_USER;
|
|
}
|
|
|
|
return TSDB_CODE_SUCCESS;
|
|
}
|
|
|
|
int32_t tscBuildCfgDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
|
SSqlCmd *pCmd = &pSql->cmd;
|
|
pCmd->payloadLen = sizeof(SCMCfgDnodeMsg);
|
|
|
|
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
|
|
tscError("%p failed to malloc for query msg", pSql);
|
|
return TSDB_CODE_CLI_OUT_OF_MEMORY;
|
|
}
|
|
|
|
pCmd->msgType = TSDB_MSG_TYPE_MD_CONFIG_DNODE;
|
|
return TSDB_CODE_SUCCESS;
|
|
}
|
|
|
|
int32_t tscBuildDropDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
|
SSqlCmd *pCmd = &pSql->cmd;
|
|
pCmd->payloadLen = sizeof(SCMDropDbMsg);
|
|
|
|
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
|
|
tscError("%p failed to malloc for query msg", pSql);
|
|
return TSDB_CODE_CLI_OUT_OF_MEMORY;
|
|
}
|
|
|
|
SCMDropDbMsg *pDropDbMsg = (SCMDropDbMsg*)pCmd->payload;
|
|
|
|
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
|
|
strncpy(pDropDbMsg->db, pTableMetaInfo->name, tListLen(pDropDbMsg->db));
|
|
pDropDbMsg->ignoreNotExists = pInfo->pDCLInfo->existsCheck ? 1 : 0;
|
|
|
|
pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_DB;
|
|
return TSDB_CODE_SUCCESS;
|
|
}
|
|
|
|
int32_t tscBuildDropTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
|
SSqlCmd *pCmd = &pSql->cmd;
|
|
pCmd->payloadLen = sizeof(SCMDropTableMsg);
|
|
|
|
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
|
|
tscError("%p failed to malloc for query msg", pSql);
|
|
return TSDB_CODE_CLI_OUT_OF_MEMORY;
|
|
}
|
|
|
|
SCMDropTableMsg *pDropTableMsg = (SCMDropTableMsg*)pCmd->payload;
|
|
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
|
|
strcpy(pDropTableMsg->tableId, pTableMetaInfo->name);
|
|
pDropTableMsg->igNotExists = pInfo->pDCLInfo->existsCheck ? 1 : 0;
|
|
|
|
pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_TABLE;
|
|
return TSDB_CODE_SUCCESS;
|
|
}
|
|
|
|
int32_t tscBuildDropDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
|
SSqlCmd *pCmd = &pSql->cmd;
|
|
pCmd->payloadLen = sizeof(SCMDropDnodeMsg);
|
|
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
|
|
tscError("%p failed to malloc for query msg", pSql);
|
|
return TSDB_CODE_CLI_OUT_OF_MEMORY;
|
|
}
|
|
|
|
SCMDropDnodeMsg *pDrop = (SCMDropDnodeMsg *)pCmd->payload;
|
|
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
|
|
strcpy(pDrop->ip, pTableMetaInfo->name);
|
|
pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_DNODE;
|
|
|
|
return TSDB_CODE_SUCCESS;
|
|
}
|
|
|
|
int32_t tscBuildDropUserMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
|
SSqlCmd *pCmd = &pSql->cmd;
|
|
pCmd->payloadLen = sizeof(SCMDropUserMsg);
|
|
pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_USER;
|
|
|
|
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
|
|
tscError("%p failed to malloc for query msg", pSql);
|
|
return TSDB_CODE_CLI_OUT_OF_MEMORY;
|
|
}
|
|
|
|
SCMDropUserMsg *pDropMsg = (SCMDropUserMsg*)pCmd->payload;
|
|
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
|
|
strcpy(pDropMsg->user, pTableMetaInfo->name);
|
|
|
|
return TSDB_CODE_SUCCESS;
|
|
}
|
|
|
|
int32_t tscBuildDropAcctMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
|
SSqlCmd *pCmd = &pSql->cmd;
|
|
pCmd->payloadLen = sizeof(SCMDropUserMsg);
|
|
pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_ACCT;
|
|
|
|
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
|
|
tscError("%p failed to malloc for query msg", pSql);
|
|
return TSDB_CODE_CLI_OUT_OF_MEMORY;
|
|
}
|
|
|
|
SCMDropUserMsg *pDropMsg = (SCMDropUserMsg*)pCmd->payload;
|
|
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
|
|
strcpy(pDropMsg->user, pTableMetaInfo->name);
|
|
|
|
return TSDB_CODE_SUCCESS;
|
|
}
|
|
|
|
int32_t tscBuildUseDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
|
SSqlCmd *pCmd = &pSql->cmd;
|
|
pCmd->payloadLen = sizeof(SCMUseDbMsg);
|
|
|
|
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
|
|
tscError("%p failed to malloc for query msg", pSql);
|
|
return TSDB_CODE_CLI_OUT_OF_MEMORY;
|
|
}
|
|
|
|
SCMUseDbMsg *pUseDbMsg = (SCMUseDbMsg*)pCmd->payload;
|
|
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
|
|
strcpy(pUseDbMsg->db, pTableMetaInfo->name);
|
|
pCmd->msgType = TSDB_MSG_TYPE_CM_USE_DB;
|
|
|
|
return TSDB_CODE_SUCCESS;
|
|
}
|
|
|
|
int32_t tscBuildShowMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
|
STscObj *pObj = pSql->pTscObj;
|
|
SSqlCmd *pCmd = &pSql->cmd;
|
|
pCmd->msgType = TSDB_MSG_TYPE_CM_SHOW;
|
|
pCmd->payloadLen = sizeof(SCMShowMsg) + 100;
|
|
|
|
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
|
|
tscError("%p failed to malloc for query msg", pSql);
|
|
return TSDB_CODE_CLI_OUT_OF_MEMORY;
|
|
}
|
|
|
|
SCMShowMsg *pShowMsg = (SCMShowMsg*)pCmd->payload;
|
|
|
|
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
|
|
size_t nameLen = strlen(pTableMetaInfo->name);
|
|
if (nameLen > 0) {
|
|
strcpy(pShowMsg->db, pTableMetaInfo->name); // prefix is set here
|
|
} else {
|
|
strcpy(pShowMsg->db, pObj->db);
|
|
}
|
|
|
|
SShowInfo *pShowInfo = &pInfo->pDCLInfo->showOpt;
|
|
pShowMsg->type = pShowInfo->showType;
|
|
|
|
if (pShowInfo->showType != TSDB_MGMT_TABLE_VNODES) {
|
|
SSQLToken *pPattern = &pShowInfo->pattern;
|
|
if (pPattern->type > 0) { // only show tables support wildcard query
|
|
strncpy(pShowMsg->payload, pPattern->z, pPattern->n);
|
|
pShowMsg->payloadLen = htons(pPattern->n);
|
|
}
|
|
} else {
|
|
SSQLToken *pIpAddr = &pShowInfo->prefix;
|
|
assert(pIpAddr->n > 0 && pIpAddr->type > 0);
|
|
|
|
strncpy(pShowMsg->payload, pIpAddr->z, pIpAddr->n);
|
|
pShowMsg->payloadLen = htons(pIpAddr->n);
|
|
}
|
|
|
|
pCmd->payloadLen = sizeof(SCMShowMsg) + pShowMsg->payloadLen;
|
|
return TSDB_CODE_SUCCESS;
|
|
}
|
|
|
|
int32_t tscBuildKillMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
|
SSqlCmd *pCmd = &pSql->cmd;
|
|
pCmd->payloadLen = sizeof(SCMKillQueryMsg);
|
|
|
|
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
|
|
tscError("%p failed to malloc for query msg", pSql);
|
|
return TSDB_CODE_CLI_OUT_OF_MEMORY;
|
|
}
|
|
|
|
SCMKillQueryMsg *pKill = (SCMKillQueryMsg*)pCmd->payload;
|
|
strncpy(pKill->queryId, pInfo->pDCLInfo->ip.z, pInfo->pDCLInfo->ip.n);
|
|
switch (pCmd->command) {
|
|
case TSDB_SQL_KILL_QUERY:
|
|
pCmd->msgType = TSDB_MSG_TYPE_CM_KILL_QUERY;
|
|
break;
|
|
case TSDB_SQL_KILL_CONNECTION:
|
|
pCmd->msgType = TSDB_MSG_TYPE_CM_KILL_CONN;
|
|
break;
|
|
case TSDB_SQL_KILL_STREAM:
|
|
pCmd->msgType = TSDB_MSG_TYPE_CM_KILL_STREAM;
|
|
break;
|
|
}
|
|
return TSDB_CODE_SUCCESS;
|
|
}
|
|
|
|
int tscEstimateCreateTableMsgLength(SSqlObj *pSql, SSqlInfo *pInfo) {
|
|
SSqlCmd *pCmd = &(pSql->cmd);
|
|
|
|
int32_t size = minMsgSize() + sizeof(SMgmtHead) + sizeof(SCMCreateTableMsg);
|
|
|
|
SCreateTableSQL *pCreateTableInfo = pInfo->pCreateTableInfo;
|
|
if (pCreateTableInfo->type == TSQL_CREATE_TABLE_FROM_STABLE) {
|
|
size += sizeof(STagData);
|
|
} else {
|
|
size += sizeof(SSchema) * (pCmd->numOfCols + pCmd->count);
|
|
}
|
|
|
|
if (pCreateTableInfo->pSelect != NULL) {
|
|
size += (pCreateTableInfo->pSelect->selectToken.n + 1);
|
|
}
|
|
|
|
return size + TSDB_EXTRA_PAYLOAD_SIZE;
|
|
}
|
|
|
|
int tscBuildCreateTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
|
int msgLen = 0;
|
|
SSchema * pSchema;
|
|
int size = 0;
|
|
SSqlCmd *pCmd = &pSql->cmd;
|
|
|
|
SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
|
|
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
|
|
|
|
// Reallocate the payload size
|
|
size = tscEstimateCreateTableMsgLength(pSql, pInfo);
|
|
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
|
|
tscError("%p failed to malloc for create table msg", pSql);
|
|
return TSDB_CODE_CLI_OUT_OF_MEMORY;
|
|
}
|
|
|
|
|
|
SCMCreateTableMsg *pCreateTableMsg = (SCMCreateTableMsg *)pCmd->payload;
|
|
strcpy(pCreateTableMsg->tableId, pTableMetaInfo->name);
|
|
|
|
// use dbinfo from table id without modifying current db info
|
|
tscGetDBInfoFromMeterId(pTableMetaInfo->name, pCreateTableMsg->db);
|
|
|
|
SCreateTableSQL *pCreateTable = pInfo->pCreateTableInfo;
|
|
|
|
pCreateTableMsg->igExists = pCreateTable->existCheck ? 1 : 0;
|
|
pCreateTableMsg->numOfColumns = htons(pCmd->numOfCols);
|
|
pCreateTableMsg->numOfTags = htons(pCmd->count);
|
|
|
|
pCreateTableMsg->sqlLen = 0;
|
|
char *pMsg = (char *)pCreateTableMsg->schema;
|
|
|
|
int8_t type = pInfo->pCreateTableInfo->type;
|
|
if (type == TSQL_CREATE_TABLE_FROM_STABLE) { // create by using super table, tags value
|
|
memcpy(pMsg, &pInfo->pCreateTableInfo->usingInfo.tagdata, sizeof(STagData));
|
|
pMsg += sizeof(STagData);
|
|
} else { // create (super) table
|
|
pSchema = (SSchema *)pCreateTableMsg->schema;
|
|
|
|
for (int i = 0; i < pCmd->numOfCols + pCmd->count; ++i) {
|
|
TAOS_FIELD *pField = tscFieldInfoGetField(pQueryInfo, i);
|
|
|
|
pSchema->type = pField->type;
|
|
strcpy(pSchema->name, pField->name);
|
|
pSchema->bytes = htons(pField->bytes);
|
|
|
|
pSchema++;
|
|
}
|
|
|
|
pMsg = (char *)pSchema;
|
|
if (type == TSQL_CREATE_STREAM) { // check if it is a stream sql
|
|
SQuerySQL *pQuerySql = pInfo->pCreateTableInfo->pSelect;
|
|
|
|
strncpy(pMsg, pQuerySql->selectToken.z, pQuerySql->selectToken.n + 1);
|
|
pCreateTableMsg->sqlLen = htons(pQuerySql->selectToken.n + 1);
|
|
pMsg += pQuerySql->selectToken.n + 1;
|
|
}
|
|
}
|
|
|
|
tscClearFieldInfo(&pQueryInfo->fieldsInfo);
|
|
|
|
msgLen = pMsg - (char*)pCreateTableMsg;
|
|
pCreateTableMsg->contLen = htonl(msgLen);
|
|
pCmd->payloadLen = msgLen;
|
|
pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_TABLE;
|
|
|
|
assert(msgLen + minMsgSize() <= size);
|
|
return TSDB_CODE_SUCCESS;
|
|
}
|
|
|
|
int tscEstimateAlterTableMsgLength(SSqlCmd *pCmd) {
|
|
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
|
|
return minMsgSize() + sizeof(SMgmtHead) + sizeof(SCMAlterTableMsg) + sizeof(SSchema) * tscNumOfFields(pQueryInfo) +
|
|
TSDB_EXTRA_PAYLOAD_SIZE;
|
|
}
|
|
|
|
int tscBuildAlterTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
|
SCMAlterTableMsg *pAlterTableMsg;
|
|
char * pMsg;
|
|
int msgLen = 0;
|
|
int size = 0;
|
|
|
|
SSqlCmd * pCmd = &pSql->cmd;
|
|
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
|
|
|
|
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
|
|
|
|
size = tscEstimateAlterTableMsgLength(pCmd);
|
|
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
|
|
tscError("%p failed to malloc for alter table msg", pSql);
|
|
return -1;
|
|
}
|
|
|
|
pAlterTableMsg = (SCMAlterTableMsg *)pCmd->payload;
|
|
|
|
tscGetDBInfoFromMeterId(pTableMetaInfo->name, pAlterTableMsg->db);
|
|
|
|
SAlterTableSQL *pAlterInfo = pInfo->pAlterInfo;
|
|
|
|
strcpy(pAlterTableMsg->tableId, pTableMetaInfo->name);
|
|
pAlterTableMsg->type = htons(pAlterInfo->type);
|
|
|
|
pAlterTableMsg->numOfCols = htons(tscNumOfFields(pQueryInfo));
|
|
memcpy(pAlterTableMsg->tagVal, pAlterInfo->tagData.data, TSDB_MAX_TAGS_LEN);
|
|
|
|
SSchema *pSchema = pAlterTableMsg->schema;
|
|
for (int i = 0; i < tscNumOfFields(pQueryInfo); ++i) {
|
|
TAOS_FIELD *pField = tscFieldInfoGetField(pQueryInfo, i);
|
|
|
|
pSchema->type = pField->type;
|
|
strcpy(pSchema->name, pField->name);
|
|
pSchema->bytes = htons(pField->bytes);
|
|
pSchema++;
|
|
}
|
|
|
|
pMsg = (char *)pSchema;
|
|
|
|
msgLen = pMsg - (char*)pAlterTableMsg;
|
|
pCmd->payloadLen = msgLen;
|
|
pCmd->msgType = TSDB_MSG_TYPE_CM_ALTER_TABLE;
|
|
|
|
assert(msgLen + minMsgSize() <= size);
|
|
|
|
return TSDB_CODE_SUCCESS;
|
|
}
|
|
|
|
int tscAlterDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
|
SSqlCmd *pCmd = &pSql->cmd;
|
|
pCmd->payloadLen = sizeof(SCMAlterDbMsg);
|
|
pCmd->msgType = TSDB_MSG_TYPE_CM_ALTER_DB;
|
|
|
|
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
|
|
tscError("%p failed to malloc for query msg", pSql);
|
|
return TSDB_CODE_CLI_OUT_OF_MEMORY;
|
|
}
|
|
|
|
SCMAlterDbMsg *pAlterDbMsg = (SCMAlterDbMsg*)pCmd->payload;
|
|
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
|
|
strcpy(pAlterDbMsg->db, pTableMetaInfo->name);
|
|
|
|
return TSDB_CODE_SUCCESS;
|
|
}
|
|
|
|
int tscBuildRetrieveFromMgmtMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
|
SSqlCmd *pCmd = &pSql->cmd;
|
|
pCmd->msgType = TSDB_MSG_TYPE_RETRIEVE;
|
|
pCmd->payloadLen = sizeof(SRetrieveTableMsg);
|
|
|
|
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
|
|
tscError("%p failed to malloc for query msg", pSql);
|
|
return TSDB_CODE_CLI_OUT_OF_MEMORY;
|
|
}
|
|
|
|
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
|
|
SRetrieveTableMsg *pRetrieveMsg = (SRetrieveTableMsg*)pCmd->payload;
|
|
pRetrieveMsg->qhandle = htobe64(pSql->res.qhandle);
|
|
pRetrieveMsg->free = htons(pQueryInfo->type);
|
|
|
|
return TSDB_CODE_SUCCESS;
|
|
}
|
|
|
|
static int tscSetResultPointer(SQueryInfo *pQueryInfo, SSqlRes *pRes) {
|
|
if (tscCreateResPointerInfo(pRes, pQueryInfo) != TSDB_CODE_SUCCESS) {
|
|
return pRes->code;
|
|
}
|
|
|
|
for (int i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) {
|
|
int16_t offset = tscFieldInfoGetOffset(pQueryInfo, i);
|
|
pRes->tsrow[i] = (pRes->data + offset * pRes->numOfRows);
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
/*
|
|
* this function can only be called once.
|
|
* by using pRes->rspType to denote its status
|
|
*
|
|
* if pRes->rspType is 1, no more result
|
|
*/
|
|
static int tscLocalResultCommonBuilder(SSqlObj *pSql, int32_t numOfRes) {
|
|
SSqlRes *pRes = &pSql->res;
|
|
SSqlCmd *pCmd = &pSql->cmd;
|
|
|
|
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
|
|
|
|
pRes->code = TSDB_CODE_SUCCESS;
|
|
|
|
if (pRes->rspType == 0) {
|
|
pRes->numOfRows = numOfRes;
|
|
pRes->row = 0;
|
|
pRes->rspType = 1;
|
|
|
|
tscSetResultPointer(pQueryInfo, pRes);
|
|
} else {
|
|
tscResetForNextRetrieve(pRes);
|
|
}
|
|
|
|
uint8_t code = pSql->res.code;
|
|
if (pSql->fp) {
|
|
if (code == TSDB_CODE_SUCCESS) {
|
|
(*pSql->fp)(pSql->param, pSql, pSql->res.numOfRows);
|
|
} else {
|
|
tscQueueAsyncRes(pSql);
|
|
}
|
|
}
|
|
|
|
return code;
|
|
}
|
|
|
|
int tscProcessDescribeTableRsp(SSqlObj *pSql) {
|
|
SSqlCmd * pCmd = &pSql->cmd;
|
|
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
|
|
|
|
STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
|
|
|
|
int32_t numOfRes = tinfo.numOfColumns + tinfo.numOfTags;
|
|
return tscLocalResultCommonBuilder(pSql, numOfRes);
|
|
}
|
|
|
|
int tscProcessTagRetrieveRsp(SSqlObj *pSql) {
|
|
// SSqlCmd *pCmd = &pSql->cmd;
|
|
|
|
// SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
|
|
// STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
|
|
|
|
int32_t numOfRes = 0;
|
|
#if 0
|
|
if (tscSqlExprGet(pQueryInfo, 0)->functionId == TSDB_FUNC_TAGPRJ) {
|
|
numOfRes = pTableMetaInfo->pMetricMeta->numOfTables;
|
|
} else {
|
|
numOfRes = 1; // for count function, there is only one output.
|
|
}
|
|
|
|
#endif
|
|
|
|
return tscLocalResultCommonBuilder(pSql, numOfRes);
|
|
}
|
|
|
|
int tscProcessRetrieveMetricRsp(SSqlObj *pSql) {
|
|
SSqlRes *pRes = &pSql->res;
|
|
SSqlCmd *pCmd = &pSql->cmd;
|
|
|
|
pRes->code = tscDoLocalreduce(pSql);
|
|
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
|
|
|
|
if (pRes->code == TSDB_CODE_SUCCESS && pRes->numOfRows > 0) {
|
|
tscSetResultPointer(pQueryInfo, pRes);
|
|
}
|
|
|
|
pRes->row = 0;
|
|
|
|
uint8_t code = pRes->code;
|
|
if (pSql->fp) { // async retrieve metric data
|
|
if (pRes->code == TSDB_CODE_SUCCESS) {
|
|
(*pSql->fp)(pSql->param, pSql, pRes->numOfRows);
|
|
} else {
|
|
tscQueueAsyncRes(pSql);
|
|
}
|
|
}
|
|
|
|
return code;
|
|
}
|
|
|
|
int tscProcessEmptyResultRsp(SSqlObj *pSql) { return tscLocalResultCommonBuilder(pSql, 0); }
|
|
|
|
int tscBuildConnectMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
|
STscObj *pObj = pSql->pTscObj;
|
|
SSqlCmd *pCmd = &pSql->cmd;
|
|
pCmd->msgType = TSDB_MSG_TYPE_CM_CONNECT;
|
|
pCmd->payloadLen = sizeof(SCMConnectMsg);
|
|
|
|
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
|
|
tscError("%p failed to malloc for query msg", pSql);
|
|
return TSDB_CODE_CLI_OUT_OF_MEMORY;
|
|
}
|
|
|
|
SCMConnectMsg *pConnect = (SCMConnectMsg*)pCmd->payload;
|
|
|
|
char *db; // ugly code to move the space
|
|
db = strstr(pObj->db, TS_PATH_DELIMITER);
|
|
db = (db == NULL) ? pObj->db : db + 1;
|
|
strcpy(pConnect->db, db);
|
|
strcpy(pConnect->clientVersion, version);
|
|
strcpy(pConnect->msgVersion, "");
|
|
|
|
return TSDB_CODE_SUCCESS;
|
|
}
|
|
|
|
int tscBuildTableMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
|
SCMTableInfoMsg *pInfoMsg;
|
|
char * pMsg;
|
|
int msgLen = 0;
|
|
|
|
char *tmpData = 0;
|
|
if (pSql->cmd.allocSize > 0) {
|
|
tmpData = calloc(1, pSql->cmd.allocSize);
|
|
if (NULL == tmpData) {
|
|
return TSDB_CODE_CLI_OUT_OF_MEMORY;
|
|
}
|
|
|
|
// STagData is in binary format, strncpy is not available
|
|
memcpy(tmpData, pSql->cmd.payload, pSql->cmd.allocSize);
|
|
}
|
|
|
|
SSqlCmd * pCmd = &pSql->cmd;
|
|
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
|
|
|
|
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
|
|
|
|
pInfoMsg = (SCMTableInfoMsg *)pCmd->payload;
|
|
strcpy(pInfoMsg->tableId, pTableMetaInfo->name);
|
|
pInfoMsg->createFlag = htons(pSql->cmd.autoCreated ? 1 : 0);
|
|
|
|
pMsg = (char*)pInfoMsg + sizeof(SCMTableInfoMsg);
|
|
|
|
if (pSql->cmd.autoCreated) {
|
|
memcpy(pInfoMsg->tags, tmpData, sizeof(STagData));
|
|
pMsg += sizeof(STagData);
|
|
}
|
|
|
|
pCmd->payloadLen = pMsg - (char*)pInfoMsg;;
|
|
pCmd->msgType = TSDB_MSG_TYPE_CM_TABLE_META;
|
|
|
|
tfree(tmpData);
|
|
|
|
assert(msgLen + minMsgSize() <= pCmd->allocSize);
|
|
return TSDB_CODE_SUCCESS;
|
|
}
|
|
|
|
/**
|
|
* multi meter meta req pkg format:
|
|
* | SMgmtHead | SCMMultiTableInfoMsg | tableId0 | tableId1 | tableId2 | ......
|
|
* no used 4B
|
|
**/
|
|
int tscBuildMultiMeterMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
|
SSqlCmd *pCmd = &pSql->cmd;
|
|
|
|
// copy payload content to temp buff
|
|
char *tmpData = 0;
|
|
if (pCmd->payloadLen > 0) {
|
|
tmpData = calloc(1, pCmd->payloadLen + 1);
|
|
if (NULL == tmpData) return -1;
|
|
memcpy(tmpData, pCmd->payload, pCmd->payloadLen);
|
|
}
|
|
|
|
// fill head info
|
|
SMgmtHead *pMgmt = (SMgmtHead *)(pCmd->payload + tsRpcHeadSize);
|
|
memset(pMgmt->db, 0, TSDB_TABLE_ID_LEN); // server don't need the db
|
|
|
|
SCMMultiTableInfoMsg *pInfoMsg = (SCMMultiTableInfoMsg *)(pCmd->payload + tsRpcHeadSize + sizeof(SMgmtHead));
|
|
pInfoMsg->numOfTables = htonl((int32_t)pCmd->count);
|
|
|
|
if (pCmd->payloadLen > 0) {
|
|
memcpy(pInfoMsg->tableIds, tmpData, pCmd->payloadLen);
|
|
}
|
|
|
|
tfree(tmpData);
|
|
|
|
pCmd->payloadLen += sizeof(SMgmtHead) + sizeof(SCMMultiTableInfoMsg);
|
|
pCmd->msgType = TSDB_MSG_TYPE_CM_TABLES_META;
|
|
|
|
assert(pCmd->payloadLen + minMsgSize() <= pCmd->allocSize);
|
|
|
|
tscTrace("%p build load multi-metermeta msg completed, numOfTables:%d, msg size:%d", pSql, pCmd->count,
|
|
pCmd->payloadLen);
|
|
|
|
return pCmd->payloadLen;
|
|
}
|
|
|
|
static UNUSED_FUNC int32_t tscEstimateMetricMetaMsgSize(SSqlCmd *pCmd) {
|
|
const int32_t defaultSize =
|
|
minMsgSize() + sizeof(SSuperTableMetaMsg) + sizeof(SMgmtHead) + sizeof(int16_t) * TSDB_MAX_TAGS;
|
|
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
|
|
|
|
int32_t n = 0;
|
|
for (int32_t i = 0; i < pQueryInfo->tagCond.numOfTagCond; ++i) {
|
|
n += strlen(pQueryInfo->tagCond.cond[i].cond);
|
|
}
|
|
|
|
int32_t tagLen = n * TSDB_NCHAR_SIZE;
|
|
if (pQueryInfo->tagCond.tbnameCond.cond != NULL) {
|
|
tagLen += strlen(pQueryInfo->tagCond.tbnameCond.cond) * TSDB_NCHAR_SIZE;
|
|
}
|
|
|
|
int32_t joinCondLen = (TSDB_TABLE_ID_LEN + sizeof(int16_t)) * 2;
|
|
int32_t elemSize = sizeof(SSuperTableMetaElemMsg) * pQueryInfo->numOfTables;
|
|
|
|
int32_t colSize = pQueryInfo->groupbyExpr.numOfGroupCols*sizeof(SColIndexEx);
|
|
|
|
int32_t len = tagLen + joinCondLen + elemSize + colSize + defaultSize;
|
|
|
|
return MAX(len, TSDB_DEFAULT_PAYLOAD_SIZE);
|
|
}
|
|
|
|
int tscBuildSTableVgroupMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
|
|
|
#if 0
|
|
SSuperTableMetaMsg *pMetaMsg;
|
|
char * pMsg, *pStart;
|
|
int msgLen = 0;
|
|
int tableIndex = 0;
|
|
|
|
SSqlCmd * pCmd = &pSql->cmd;
|
|
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
|
|
|
|
STagCond *pTagCond = &pQueryInfo->tagCond;
|
|
|
|
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, tableIndex);
|
|
|
|
int32_t size = tscEstimateMetricMetaMsgSize(pCmd);
|
|
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
|
|
tscError("%p failed to malloc for metric meter msg", pSql);
|
|
return -1;
|
|
}
|
|
|
|
pStart = pCmd->payload + tsRpcHeadSize;
|
|
pMsg = pStart;
|
|
|
|
SMgmtHead *pMgmt = (SMgmtHead *)pMsg;
|
|
tscGetDBInfoFromMeterId(pTableMetaInfo->name, pMgmt->db);
|
|
|
|
pMsg += sizeof(SMgmtHead);
|
|
|
|
pMetaMsg = (SSuperTableMetaMsg *)pMsg;
|
|
pMetaMsg->numOfTables = htonl(pQueryInfo->numOfTables);
|
|
|
|
pMsg += sizeof(SSuperTableMetaMsg);
|
|
|
|
int32_t offset = pMsg - (char *)pMetaMsg;
|
|
pMetaMsg->join = htonl(offset);
|
|
|
|
// todo refactor
|
|
pMetaMsg->joinCondLen = htonl((TSDB_TABLE_ID_LEN + sizeof(int16_t)) * 2);
|
|
|
|
memcpy(pMsg, pTagCond->joinInfo.left.tableId, TSDB_TABLE_ID_LEN);
|
|
pMsg += TSDB_TABLE_ID_LEN;
|
|
|
|
*(int16_t *)pMsg = pTagCond->joinInfo.left.tagCol;
|
|
pMsg += sizeof(int16_t);
|
|
|
|
memcpy(pMsg, pTagCond->joinInfo.right.tableId, TSDB_TABLE_ID_LEN);
|
|
pMsg += TSDB_TABLE_ID_LEN;
|
|
|
|
*(int16_t *)pMsg = pTagCond->joinInfo.right.tagCol;
|
|
pMsg += sizeof(int16_t);
|
|
|
|
for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
|
|
pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, i);
|
|
uint64_t uid = pTableMetaInfo->pTableMeta->uid;
|
|
|
|
offset = pMsg - (char *)pMetaMsg;
|
|
pMetaMsg->metaElem[i] = htonl(offset);
|
|
|
|
SSuperTableMetaElemMsg *pElem = (SSuperTableMetaElemMsg *)pMsg;
|
|
pMsg += sizeof(SSuperTableMetaElemMsg);
|
|
|
|
// convert to unicode before sending to mnode for metric query
|
|
int32_t condLen = 0;
|
|
if (pTagCond->numOfTagCond > 0) {
|
|
SCond *pCond = tsGetSTableQueryCondPos(pTagCond, uid);
|
|
if (pCond != NULL && pCond->cond != NULL) {
|
|
condLen = strlen(pCond->cond) + 1;
|
|
|
|
bool ret = taosMbsToUcs4(pCond->cond, condLen, pMsg, condLen * TSDB_NCHAR_SIZE);
|
|
if (!ret) {
|
|
tscError("%p mbs to ucs4 failed:%s", pSql, tsGetSTableQueryCondPos(pTagCond, uid));
|
|
return 0;
|
|
}
|
|
}
|
|
}
|
|
|
|
pElem->condLen = htonl(condLen);
|
|
|
|
offset = pMsg - (char *)pMetaMsg;
|
|
pElem->cond = htonl(offset);
|
|
pMsg += condLen * TSDB_NCHAR_SIZE;
|
|
|
|
pElem->rel = htons(pTagCond->relType);
|
|
if (pTagCond->tbnameCond.uid == uid) {
|
|
offset = pMsg - (char *)pMetaMsg;
|
|
|
|
pElem->tableCond = htonl(offset);
|
|
|
|
uint32_t len = 0;
|
|
if (pTagCond->tbnameCond.cond != NULL) {
|
|
len = strlen(pTagCond->tbnameCond.cond);
|
|
memcpy(pMsg, pTagCond->tbnameCond.cond, len);
|
|
}
|
|
|
|
pElem->tableCondLen = htonl(len);
|
|
pMsg += len;
|
|
}
|
|
|
|
SSqlGroupbyExpr *pGroupby = &pQueryInfo->groupbyExpr;
|
|
|
|
if (pGroupby->tableIndex != i && pGroupby->numOfGroupCols > 0) {
|
|
pElem->orderType = 0;
|
|
pElem->orderIndex = 0;
|
|
pElem->numOfGroupCols = 0;
|
|
} else {
|
|
pElem->numOfGroupCols = htons(pGroupby->numOfGroupCols);
|
|
for (int32_t j = 0; j < pTableMetaInfo->numOfTags; ++j) {
|
|
pElem->tagCols[j] = htons(pTableMetaInfo->tagColumnIndex[j]);
|
|
}
|
|
|
|
if (pGroupby->numOfGroupCols != 0) {
|
|
pElem->orderIndex = htons(pGroupby->orderIndex);
|
|
pElem->orderType = htons(pGroupby->orderType);
|
|
offset = pMsg - (char *)pMetaMsg;
|
|
|
|
pElem->groupbyTagColumnList = htonl(offset);
|
|
for (int32_t j = 0; j < pQueryInfo->groupbyExpr.numOfGroupCols; ++j) {
|
|
SColIndexEx *pCol = &pQueryInfo->groupbyExpr.columnInfo[j];
|
|
SColIndexEx *pDestCol = (SColIndexEx *)pMsg;
|
|
|
|
pDestCol->colIdxInBuf = 0;
|
|
pDestCol->colIdx = htons(pCol->colIdx);
|
|
pDestCol->colId = htons(pDestCol->colId);
|
|
pDestCol->flag = htons(pDestCol->flag);
|
|
strncpy(pDestCol->name, pCol->name, tListLen(pCol->name));
|
|
|
|
pMsg += sizeof(SColIndexEx);
|
|
}
|
|
}
|
|
}
|
|
|
|
strcpy(pElem->tableId, pTableMetaInfo->name);
|
|
pElem->numOfTags = htons(pTableMetaInfo->numOfTags);
|
|
|
|
int16_t len = pMsg - (char *)pElem;
|
|
pElem->elemLen = htons(len); // redundant data for integrate check
|
|
}
|
|
|
|
msgLen = pMsg - pStart;
|
|
pCmd->payloadLen = msgLen;
|
|
pCmd->msgType = TSDB_MSG_TYPE_CM_STABLE_VGROUP;
|
|
assert(msgLen + minMsgSize() <= size);
|
|
#endif
|
|
|
|
SSqlCmd *pCmd = &pSql->cmd;
|
|
|
|
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
|
|
|
|
SCMSTableVgroupMsg *pStableVgroupMsg = (SCMSTableVgroupMsg *) pCmd->payload;
|
|
strncpy(pStableVgroupMsg->tableId, pTableMetaInfo->name, tListLen(pStableVgroupMsg->tableId));
|
|
|
|
pCmd->msgType = TSDB_MSG_TYPE_CM_STABLE_VGROUP;
|
|
pCmd->payloadLen = sizeof(SCMSTableVgroupMsg);
|
|
|
|
return TSDB_CODE_SUCCESS;
|
|
}
|
|
|
|
int tscEstimateHeartBeatMsgLength(SSqlObj *pSql) {
|
|
int size = 0;
|
|
STscObj *pObj = pSql->pTscObj;
|
|
|
|
size += tsRpcHeadSize + sizeof(SMgmtHead);
|
|
size += sizeof(SQqueryList);
|
|
|
|
SSqlObj *tpSql = pObj->sqlList;
|
|
while (tpSql) {
|
|
size += sizeof(SQueryDesc);
|
|
tpSql = tpSql->next;
|
|
}
|
|
|
|
size += sizeof(SStreamList);
|
|
SSqlStream *pStream = pObj->streamList;
|
|
while (pStream) {
|
|
size += sizeof(SStreamDesc);
|
|
pStream = pStream->next;
|
|
}
|
|
|
|
return size + TSDB_EXTRA_PAYLOAD_SIZE;
|
|
}
|
|
|
|
int tscBuildHeartBeatMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
|
char *pMsg, *pStart;
|
|
int msgLen = 0;
|
|
int size = 0;
|
|
|
|
SSqlCmd *pCmd = &pSql->cmd;
|
|
STscObj *pObj = pSql->pTscObj;
|
|
|
|
pthread_mutex_lock(&pObj->mutex);
|
|
|
|
size = tscEstimateHeartBeatMsgLength(pSql);
|
|
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
|
|
pthread_mutex_unlock(&pObj->mutex);
|
|
tscError("%p failed to malloc for heartbeat msg", pSql);
|
|
return -1;
|
|
}
|
|
|
|
pMsg = pCmd->payload + tsRpcHeadSize;
|
|
pStart = pMsg;
|
|
|
|
SMgmtHead *pMgmt = (SMgmtHead *)pMsg;
|
|
strcpy(pMgmt->db, pObj->db);
|
|
pMsg += sizeof(SMgmtHead);
|
|
|
|
pMsg = tscBuildQueryStreamDesc(pMsg, pObj);
|
|
pthread_mutex_unlock(&pObj->mutex);
|
|
|
|
msgLen = pMsg - pStart;
|
|
pCmd->payloadLen = msgLen;
|
|
pCmd->msgType = TSDB_MSG_TYPE_CM_HEARTBEAT;
|
|
|
|
assert(msgLen + minMsgSize() <= size);
|
|
return msgLen;
|
|
}
|
|
|
|
int tscProcessTableMetaRsp(SSqlObj *pSql) {
|
|
STableMetaMsg *pMetaMsg = (STableMetaMsg *)pSql->res.pRsp;
|
|
|
|
pMetaMsg->sid = htonl(pMetaMsg->sid);
|
|
pMetaMsg->sversion = htons(pMetaMsg->sversion);
|
|
pMetaMsg->vgId = htonl(pMetaMsg->vgId);
|
|
pMetaMsg->uid = htobe64(pMetaMsg->uid);
|
|
pMetaMsg->contLen = htons(pMetaMsg->contLen);
|
|
pMetaMsg->numOfColumns = htons(pMetaMsg->numOfColumns);
|
|
|
|
if (pMetaMsg->sid < 0 || pMetaMsg->vgId < 0) {
|
|
tscError("invalid meter vgId:%d, sid%d", pMetaMsg->vgId, pMetaMsg->sid);
|
|
return TSDB_CODE_INVALID_VALUE;
|
|
}
|
|
|
|
if (pMetaMsg->numOfTags > TSDB_MAX_TAGS || pMetaMsg->numOfTags < 0) {
|
|
tscError("invalid numOfTags:%d", pMetaMsg->numOfTags);
|
|
return TSDB_CODE_INVALID_VALUE;
|
|
}
|
|
|
|
if (pMetaMsg->numOfColumns > TSDB_MAX_COLUMNS || pMetaMsg->numOfColumns <= 0) {
|
|
tscError("invalid numOfColumns:%d", pMetaMsg->numOfColumns);
|
|
return TSDB_CODE_INVALID_VALUE;
|
|
}
|
|
|
|
for (int i = 0; i < TSDB_VNODES_SUPPORT; ++i) {
|
|
pMetaMsg->vpeerDesc[i].vgId = htonl(pMetaMsg->vpeerDesc[i].vgId);
|
|
pMetaMsg->vpeerDesc[i].ip = htonl(pMetaMsg->vpeerDesc[i].ip);
|
|
pMetaMsg->vpeerDesc[i].dnodeId = htonl(pMetaMsg->vpeerDesc[i].dnodeId);
|
|
}
|
|
|
|
SSchema* pSchema = pMetaMsg->schema;
|
|
|
|
int32_t numOfTotalCols = pMetaMsg->numOfColumns + pMetaMsg->numOfTags;
|
|
for (int i = 0; i < numOfTotalCols; ++i) {
|
|
pSchema->bytes = htons(pSchema->bytes);
|
|
pSchema->colId = htons(pSchema->colId);
|
|
pSchema++;
|
|
}
|
|
|
|
size_t size = 0;
|
|
STableMeta* pTableMeta = tscCreateTableMetaFromMsg(pMetaMsg, &size);
|
|
|
|
#if 0
|
|
// if current table is created according to super table, get the table meta of super table
|
|
if (pTableMeta->tableType == TSDB_CHILD_TABLE) {
|
|
char id[TSDB_TABLE_ID_LEN + 1] = {0};
|
|
strncpy(id, pMetaMsg->stableId, TSDB_TABLE_ID_LEN);
|
|
|
|
// NOTE: if the table meta of super table is not cached at client side yet, the pSTable is NULL
|
|
pTableMeta->pSTable = taosCacheAcquireByName(tscCacheHandle, id);
|
|
}
|
|
#endif
|
|
|
|
// todo add one more function: taosAddDataIfNotExists();
|
|
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
|
|
assert(pTableMetaInfo->pTableMeta == NULL);
|
|
|
|
pTableMetaInfo->pTableMeta =
|
|
(STableMeta *) taosCachePut(tscCacheHandle, pTableMetaInfo->name, pTableMeta, size, tsMeterMetaKeepTimer);
|
|
|
|
// todo handle out of memory case
|
|
if (pTableMetaInfo->pTableMeta == NULL) {
|
|
return TSDB_CODE_CLI_OUT_OF_MEMORY;
|
|
}
|
|
|
|
free(pTableMeta);
|
|
|
|
return TSDB_CODE_SUCCESS;
|
|
}
|
|
|
|
/**
|
|
* multi meter meta rsp pkg format:
|
|
* | STaosRsp | ieType | SCMMultiTableInfoMsg | SMeterMeta0 | SSchema0 | SMeterMeta1 | SSchema1 | SMeterMeta2 | SSchema2
|
|
* |...... 1B 1B 4B
|
|
**/
|
|
int tscProcessMultiMeterMetaRsp(SSqlObj *pSql) {
|
|
// uint8_t ieType;
|
|
// int32_t totalNum;
|
|
// int32_t i;
|
|
#if 0
|
|
char *rsp = pSql->res.pRsp;
|
|
|
|
ieType = *rsp;
|
|
if (ieType != TSDB_IE_TYPE_META) {
|
|
tscError("invalid ie type:%d", ieType);
|
|
pSql->res.code = TSDB_CODE_INVALID_IE;
|
|
pSql->res.numOfTotal = 0;
|
|
return TSDB_CODE_OTHERS;
|
|
}
|
|
|
|
rsp++;
|
|
|
|
SCMMultiTableInfoMsg *pInfo = (SCMMultiTableInfoMsg *)rsp;
|
|
totalNum = htonl(pInfo->numOfTables);
|
|
rsp += sizeof(SCMMultiTableInfoMsg);
|
|
|
|
for (i = 0; i < totalNum; i++) {
|
|
SMultiTableMeta *pMultiMeta = (SMultiTableMeta *)rsp;
|
|
STableMeta * pMeta = pMultiMeta->metas;
|
|
|
|
pMeta->sid = htonl(pMeta->sid);
|
|
pMeta->sversion = htons(pMeta->sversion);
|
|
pMeta->vgId = htonl(pMeta->vgId);
|
|
pMeta->uid = htobe64(pMeta->uid);
|
|
|
|
if (pMeta->sid <= 0 || pMeta->vgId < 0) {
|
|
tscError("invalid meter vgId:%d, sid%d", pMeta->vgId, pMeta->sid);
|
|
pSql->res.code = TSDB_CODE_INVALID_VALUE;
|
|
pSql->res.numOfTotal = i;
|
|
return TSDB_CODE_OTHERS;
|
|
}
|
|
|
|
// pMeta->numOfColumns = htons(pMeta->numOfColumns);
|
|
//
|
|
// if (pMeta->numOfTags > TSDB_MAX_TAGS || pMeta->numOfTags < 0) {
|
|
// tscError("invalid tag value count:%d", pMeta->numOfTags);
|
|
// pSql->res.code = TSDB_CODE_INVALID_VALUE;
|
|
// pSql->res.numOfTotal = i;
|
|
// return TSDB_CODE_OTHERS;
|
|
// }
|
|
//
|
|
// if (pMeta->numOfTags > TSDB_MAX_TAGS || pMeta->numOfTags < 0) {
|
|
// tscError("invalid numOfTags:%d", pMeta->numOfTags);
|
|
// pSql->res.code = TSDB_CODE_INVALID_VALUE;
|
|
// pSql->res.numOfTotal = i;
|
|
// return TSDB_CODE_OTHERS;
|
|
// }
|
|
//
|
|
// if (pMeta->numOfColumns > TSDB_MAX_COLUMNS || pMeta->numOfColumns < 0) {
|
|
// tscError("invalid numOfColumns:%d", pMeta->numOfColumns);
|
|
// pSql->res.code = TSDB_CODE_INVALID_VALUE;
|
|
// pSql->res.numOfTotal = i;
|
|
// return TSDB_CODE_OTHERS;
|
|
// }
|
|
//
|
|
// for (int j = 0; j < TSDB_VNODES_SUPPORT; ++j) {
|
|
// pMeta->vpeerDesc[j].vnode = htonl(pMeta->vpeerDesc[j].vnode);
|
|
// }
|
|
//
|
|
// pMeta->rowSize = 0;
|
|
// rsp += sizeof(SMultiTableMeta);
|
|
// pSchema = (SSchema *)rsp;
|
|
//
|
|
// int32_t numOfTotalCols = pMeta->numOfColumns + pMeta->numOfTags;
|
|
// for (int j = 0; j < numOfTotalCols; ++j) {
|
|
// pSchema->bytes = htons(pSchema->bytes);
|
|
// pSchema->colId = htons(pSchema->colId);
|
|
//
|
|
// // ignore the tags length
|
|
// if (j < pMeta->numOfColumns) {
|
|
// pMeta->rowSize += pSchema->bytes;
|
|
// }
|
|
// pSchema++;
|
|
// }
|
|
//
|
|
// rsp += numOfTotalCols * sizeof(SSchema);
|
|
//
|
|
// int32_t tagLen = 0;
|
|
// SSchema *pTagsSchema = tscGetTableTagSchema(pMeta);
|
|
//
|
|
// if (pMeta->tableType == TSDB_CHILD_TABLE) {
|
|
// for (int32_t j = 0; j < pMeta->numOfTags; ++j) {
|
|
// tagLen += pTagsSchema[j].bytes;
|
|
// }
|
|
// }
|
|
//
|
|
// rsp += tagLen;
|
|
// int32_t size = (int32_t)(rsp - ((char *)pMeta)); // Consistent with STableMeta in cache
|
|
//
|
|
// pMeta->index = 0;
|
|
// (void)taosCachePut(tscCacheHandle, pMeta->tableId, (char *)pMeta, size, tsMeterMetaKeepTimer);
|
|
// }
|
|
}
|
|
|
|
pSql->res.code = TSDB_CODE_SUCCESS;
|
|
pSql->res.numOfTotal = i;
|
|
tscTrace("%p load multi-metermeta resp complete num:%d", pSql, pSql->res.numOfTotal);
|
|
#endif
|
|
|
|
return TSDB_CODE_SUCCESS;
|
|
}
|
|
|
|
int tscProcessSTableVgroupRsp(SSqlObj *pSql) {
|
|
#if 0
|
|
void ** metricMetaList = NULL;
|
|
int32_t * sizes = NULL;
|
|
|
|
int32_t num = htons(*(int16_t *)rsp);
|
|
rsp += sizeof(int16_t);
|
|
|
|
metricMetaList = calloc(1, POINTER_BYTES * num);
|
|
sizes = calloc(1, sizeof(int32_t) * num);
|
|
|
|
// return with error code
|
|
if (metricMetaList == NULL || sizes == NULL) {
|
|
tfree(metricMetaList);
|
|
tfree(sizes);
|
|
pSql->res.code = TSDB_CODE_CLI_OUT_OF_MEMORY;
|
|
|
|
return pSql->res.code;
|
|
}
|
|
|
|
for (int32_t k = 0; k < num; ++k) {
|
|
pMeta = (SSuperTableMeta *)rsp;
|
|
|
|
size_t size = (size_t)pSql->res.rspLen - 1;
|
|
rsp = rsp + sizeof(SSuperTableMeta);
|
|
|
|
pMeta->numOfTables = htonl(pMeta->numOfTables);
|
|
pMeta->numOfVnodes = htonl(pMeta->numOfVnodes);
|
|
pMeta->tagLen = htons(pMeta->tagLen);
|
|
|
|
size += pMeta->numOfVnodes * sizeof(SVnodeSidList *) + pMeta->numOfTables * sizeof(STableIdInfo *);
|
|
|
|
char *pBuf = calloc(1, size);
|
|
if (pBuf == NULL) {
|
|
pSql->res.code = TSDB_CODE_CLI_OUT_OF_MEMORY;
|
|
goto _error_clean;
|
|
}
|
|
|
|
SSuperTableMeta *pNewMetricMeta = (SSuperTableMeta *)pBuf;
|
|
metricMetaList[k] = pNewMetricMeta;
|
|
|
|
pNewMetricMeta->numOfTables = pMeta->numOfTables;
|
|
pNewMetricMeta->numOfVnodes = pMeta->numOfVnodes;
|
|
pNewMetricMeta->tagLen = pMeta->tagLen;
|
|
|
|
pBuf = pBuf + sizeof(SSuperTableMeta) + pNewMetricMeta->numOfVnodes * sizeof(SVnodeSidList *);
|
|
|
|
for (int32_t i = 0; i < pMeta->numOfVnodes; ++i) {
|
|
SVnodeSidList *pSidLists = (SVnodeSidList *)rsp;
|
|
memcpy(pBuf, pSidLists, sizeof(SVnodeSidList));
|
|
|
|
pNewMetricMeta->list[i] = pBuf - (char *)pNewMetricMeta; // offset value
|
|
SVnodeSidList *pLists = (SVnodeSidList *)pBuf;
|
|
|
|
tscTrace("%p metricmeta:vid:%d,numOfTables:%d", pSql, i, pLists->numOfSids);
|
|
|
|
pBuf += sizeof(SVnodeSidList) + sizeof(STableIdInfo *) * pSidLists->numOfSids;
|
|
rsp += sizeof(SVnodeSidList);
|
|
|
|
size_t elemSize = sizeof(STableIdInfo) + pNewMetricMeta->tagLen;
|
|
for (int32_t j = 0; j < pSidLists->numOfSids; ++j) {
|
|
pLists->pSidExtInfoList[j] = pBuf - (char *)pLists;
|
|
memcpy(pBuf, rsp, elemSize);
|
|
|
|
((STableIdInfo *)pBuf)->uid = htobe64(((STableIdInfo *)pBuf)->uid);
|
|
((STableIdInfo *)pBuf)->sid = htonl(((STableIdInfo *)pBuf)->sid);
|
|
|
|
rsp += elemSize;
|
|
pBuf += elemSize;
|
|
}
|
|
}
|
|
|
|
sizes[k] = pBuf - (char *)pNewMetricMeta;
|
|
}
|
|
|
|
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
|
|
for (int32_t i = 0; i < num; ++i) {
|
|
char name[TSDB_MAX_TAGS_LEN + 1] = {0};
|
|
|
|
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, i);
|
|
tscGetMetricMetaCacheKey(pQueryInfo, name, pTableMetaInfo->pTableMeta->uid);
|
|
|
|
#ifdef _DEBUG_VIEW
|
|
printf("generate the metric key:%s, index:%d\n", name, i);
|
|
#endif
|
|
|
|
// release the used metricmeta
|
|
taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pMetricMeta), false);
|
|
pTableMetaInfo->pMetricMeta = (SSuperTableMeta *)taosCachePut(tscCacheHandle, name, (char *)metricMetaList[i],
|
|
sizes[i], tsMetricMetaKeepTimer);
|
|
tfree(metricMetaList[i]);
|
|
|
|
// failed to put into cache
|
|
if (pTableMetaInfo->pMetricMeta == NULL) {
|
|
pSql->res.code = TSDB_CODE_CLI_OUT_OF_MEMORY;
|
|
goto _error_clean;
|
|
}
|
|
}
|
|
|
|
_error_clean:
|
|
// free allocated resource
|
|
for (int32_t i = 0; i < num; ++i) {
|
|
tfree(metricMetaList[i]);
|
|
}
|
|
|
|
free(sizes);
|
|
free(metricMetaList);
|
|
#endif
|
|
|
|
SCMSTableVgroupRspMsg *pStableVgroup = (SCMSTableVgroupRspMsg *)pSql->res.pRsp;
|
|
pStableVgroup->numOfDnodes = htonl(pStableVgroup->numOfDnodes);
|
|
|
|
SSqlObj* pparent = pSql->param;
|
|
assert(pparent != NULL);
|
|
|
|
SSqlCmd* pCmd = &pparent->cmd;
|
|
STableMetaInfo* pInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
|
|
pInfo->vgroupIdList = taosArrayInit(pStableVgroup->numOfDnodes, sizeof(int32_t));
|
|
|
|
// todo opt performance
|
|
for(int32_t i = 0; i < pStableVgroup->numOfDnodes; ++i) {
|
|
taosArrayPush(pInfo->vgroupIdList, &pStableVgroup->dnodeIps[i]);
|
|
}
|
|
|
|
return pSql->res.code;
|
|
}
|
|
|
|
/*
|
|
* current process do not use the cache at all
|
|
*/
|
|
int tscProcessShowRsp(SSqlObj *pSql) {
|
|
STableMetaMsg * pMetaMsg;
|
|
SCMShowRsp *pShow;
|
|
SSchema * pSchema;
|
|
char key[20];
|
|
|
|
SSqlRes *pRes = &pSql->res;
|
|
SSqlCmd *pCmd = &pSql->cmd;
|
|
|
|
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
|
|
|
|
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
|
|
|
|
pShow = (SCMShowRsp *)pRes->pRsp;
|
|
pShow->qhandle = htobe64(pShow->qhandle);
|
|
pRes->qhandle = pShow->qhandle;
|
|
|
|
tscResetForNextRetrieve(pRes);
|
|
pMetaMsg = &(pShow->tableMeta);
|
|
|
|
pMetaMsg->numOfColumns = ntohs(pMetaMsg->numOfColumns);
|
|
|
|
pSchema = pMetaMsg->schema;
|
|
pMetaMsg->sid = ntohs(pMetaMsg->sid);
|
|
for (int i = 0; i < pMetaMsg->numOfColumns; ++i) {
|
|
pSchema->bytes = htons(pSchema->bytes);
|
|
pSchema++;
|
|
}
|
|
|
|
key[0] = pCmd->msgType + 'a';
|
|
strcpy(key + 1, "showlist");
|
|
|
|
taosCacheRelease(tscCacheHandle, (void *)&(pTableMetaInfo->pTableMeta), false);
|
|
size_t size = 0;
|
|
STableMeta* pTableMeta = tscCreateTableMetaFromMsg(pMetaMsg, &size);
|
|
|
|
pTableMetaInfo->pTableMeta =
|
|
(STableMeta *)taosCachePut(tscCacheHandle, key, (char *)pTableMeta, size, tsMeterMetaKeepTimer);
|
|
|
|
pCmd->numOfCols = pQueryInfo->fieldsInfo.numOfOutputCols;
|
|
SSchema *pTableSchema = tscGetTableSchema(pTableMetaInfo->pTableMeta);
|
|
|
|
tscColumnBaseInfoReserve(&pQueryInfo->colList, pMetaMsg->numOfColumns);
|
|
SColumnIndex index = {0};
|
|
|
|
for (int16_t i = 0; i < pMetaMsg->numOfColumns; ++i) {
|
|
index.columnIndex = i;
|
|
tscColumnBaseInfoInsert(pQueryInfo, &index);
|
|
tscFieldInfoSetValFromSchema(&pQueryInfo->fieldsInfo, i, &pTableSchema[i]);
|
|
|
|
pQueryInfo->fieldsInfo.pSqlExpr[i] = tscSqlExprInsert(pQueryInfo, i, TSDB_FUNC_TS_DUMMY, &index,
|
|
pTableSchema[i].type, pTableSchema[i].bytes, pTableSchema[i].bytes);
|
|
}
|
|
|
|
tscFieldInfoCalOffset(pQueryInfo);
|
|
|
|
tfree(pTableMeta);
|
|
return 0;
|
|
}
|
|
|
|
int tscProcessConnectRsp(SSqlObj *pSql) {
|
|
char temp[TSDB_TABLE_ID_LEN * 2];
|
|
STscObj *pObj = pSql->pTscObj;
|
|
SSqlRes *pRes = &pSql->res;
|
|
|
|
SCMConnectRsp *pConnect = (SCMConnectRsp *)pRes->pRsp;
|
|
strcpy(pObj->acctId, pConnect->acctId); // copy acctId from response
|
|
int32_t len = sprintf(temp, "%s%s%s", pObj->acctId, TS_PATH_DELIMITER, pObj->db);
|
|
|
|
assert(len <= tListLen(pObj->db));
|
|
strncpy(pObj->db, temp, tListLen(pObj->db));
|
|
|
|
// SIpList * pIpList;
|
|
// char *rsp = pRes->pRsp + sizeof(SCMConnectRsp);
|
|
// pIpList = (SIpList *)rsp;
|
|
// tscSetMgmtIpList(pIpList);
|
|
|
|
strcpy(pObj->sversion, pConnect->serverVersion);
|
|
pObj->writeAuth = pConnect->writeAuth;
|
|
pObj->superAuth = pConnect->superAuth;
|
|
taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, pObj, tscTmr, &pObj->pTimer);
|
|
|
|
return 0;
|
|
}
|
|
|
|
int tscProcessUseDbRsp(SSqlObj *pSql) {
|
|
STscObj * pObj = pSql->pTscObj;
|
|
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
|
|
|
|
strcpy(pObj->db, pTableMetaInfo->name);
|
|
return 0;
|
|
}
|
|
|
|
int tscProcessDropDbRsp(SSqlObj *UNUSED_PARAM(pSql)) {
|
|
taosCacheEmpty(tscCacheHandle);
|
|
return 0;
|
|
}
|
|
|
|
int tscProcessDropTableRsp(SSqlObj *pSql) {
|
|
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
|
|
|
|
STableMeta *pTableMeta = taosCacheAcquireByName(tscCacheHandle, pTableMetaInfo->name);
|
|
if (pTableMeta == NULL) {
|
|
/* not in cache, abort */
|
|
return 0;
|
|
}
|
|
|
|
/*
|
|
* 1. if a user drops one table, which is the only table in a vnode, remove operation will incur vnode to be removed.
|
|
* 2. Then, a user creates a new metric followed by a table with identical name of removed table but different schema,
|
|
* here the table will reside in a new vnode.
|
|
* The cached information is expired, however, we may have lost the ref of original meter. So, clear whole cache
|
|
* instead.
|
|
*/
|
|
tscTrace("%p force release metermeta after drop table:%s", pSql, pTableMetaInfo->name);
|
|
taosCacheRelease(tscCacheHandle, (void **)&pTableMeta, true);
|
|
|
|
if (pTableMetaInfo->pTableMeta) {
|
|
taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pTableMeta), true);
|
|
// taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pMetricMeta), true);
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
int tscProcessAlterTableMsgRsp(SSqlObj *pSql) {
|
|
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
|
|
|
|
STableMeta *pTableMeta = taosCacheAcquireByName(tscCacheHandle, pTableMetaInfo->name);
|
|
if (pTableMeta == NULL) { /* not in cache, abort */
|
|
return 0;
|
|
}
|
|
|
|
tscTrace("%p force release metermeta in cache after alter-table: %s", pSql, pTableMetaInfo->name);
|
|
taosCacheRelease(tscCacheHandle, (void **)&pTableMeta, true);
|
|
|
|
if (pTableMetaInfo->pTableMeta) {
|
|
bool isSuperTable = UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo);
|
|
|
|
taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pTableMeta), true);
|
|
// taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pMetricMeta), true);
|
|
|
|
if (isSuperTable) { // if it is a super table, reset whole query cache
|
|
tscTrace("%p reset query cache since table:%s is stable", pSql, pTableMetaInfo->name);
|
|
taosCacheEmpty(tscCacheHandle);
|
|
}
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
int tscProcessAlterDbMsgRsp(SSqlObj *pSql) {
|
|
UNUSED(pSql);
|
|
return 0;
|
|
}
|
|
|
|
int tscProcessQueryRsp(SSqlObj *pSql) {
|
|
SSqlRes *pRes = &pSql->res;
|
|
|
|
SQueryTableRsp *pQuery = (SQueryTableRsp *)pRes->pRsp;
|
|
pQuery->qhandle = htobe64(pQuery->qhandle);
|
|
pRes->qhandle = pQuery->qhandle;
|
|
|
|
pRes->data = NULL;
|
|
tscResetForNextRetrieve(pRes);
|
|
return 0;
|
|
}
|
|
|
|
int tscProcessRetrieveRspFromVnode(SSqlObj *pSql) {
|
|
SSqlRes *pRes = &pSql->res;
|
|
SSqlCmd *pCmd = &pSql->cmd;
|
|
|
|
SRetrieveTableRsp *pRetrieve = (SRetrieveTableRsp *)pRes->pRsp;
|
|
|
|
pRes->numOfRows = htonl(pRetrieve->numOfRows);
|
|
pRes->precision = htons(pRetrieve->precision);
|
|
pRes->offset = htobe64(pRetrieve->offset);
|
|
pRes->useconds = htobe64(pRetrieve->useconds);
|
|
pRes->completed = (pRetrieve->completed == 1);
|
|
pRes->data = pRetrieve->data;
|
|
|
|
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
|
|
tscSetResultPointer(pQueryInfo, pRes);
|
|
|
|
if (pSql->pSubscription != NULL) {
|
|
int32_t numOfCols = pQueryInfo->fieldsInfo.numOfOutputCols;
|
|
|
|
TAOS_FIELD *pField = tscFieldInfoGetField(pQueryInfo, numOfCols - 1);
|
|
int16_t offset = tscFieldInfoGetOffset(pQueryInfo, numOfCols - 1);
|
|
|
|
char* p = pRes->data + (pField->bytes + offset) * pRes->numOfRows;
|
|
|
|
int32_t numOfTables = htonl(*(int32_t*)p);
|
|
p += sizeof(int32_t);
|
|
for (int i = 0; i < numOfTables; i++) {
|
|
int64_t uid = htobe64(*(int64_t*)p);
|
|
p += sizeof(int64_t);
|
|
TSKEY key = htobe64(*(TSKEY*)p);
|
|
p += sizeof(TSKEY);
|
|
tscUpdateSubscriptionProgress(pSql->pSubscription, uid, key);
|
|
}
|
|
}
|
|
|
|
pRes->row = 0;
|
|
tscTrace("%p numOfRows:%d, offset:%d", pSql, pRes->numOfRows, pRes->offset);
|
|
|
|
return 0;
|
|
}
|
|
|
|
int tscProcessRetrieveRspFromLocal(SSqlObj *pSql) {
|
|
SSqlRes * pRes = &pSql->res;
|
|
SSqlCmd * pCmd = &pSql->cmd;
|
|
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
|
|
|
|
SRetrieveTableRsp *pRetrieve = (SRetrieveTableRsp *)pRes->pRsp;
|
|
|
|
pRes->numOfRows = htonl(pRetrieve->numOfRows);
|
|
pRes->data = pRetrieve->data;
|
|
|
|
tscSetResultPointer(pQueryInfo, pRes);
|
|
pRes->row = 0;
|
|
return 0;
|
|
}
|
|
|
|
void tscTableMetaCallBack(void *param, TAOS_RES *res, int code);
|
|
|
|
static int32_t getTableMetaFromMgmt(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo) {
|
|
SSqlObj *pNew = calloc(1, sizeof(SSqlObj));
|
|
if (NULL == pNew) {
|
|
tscError("%p malloc failed for new sqlobj to get meter meta", pSql);
|
|
return TSDB_CODE_CLI_OUT_OF_MEMORY;
|
|
}
|
|
|
|
pNew->pTscObj = pSql->pTscObj;
|
|
pNew->signature = pNew;
|
|
pNew->cmd.command = TSDB_SQL_META;
|
|
|
|
tscAddSubqueryInfo(&pNew->cmd);
|
|
|
|
SQueryInfo *pNewQueryInfo = NULL;
|
|
tscGetQueryInfoDetailSafely(&pNew->cmd, 0, &pNewQueryInfo);
|
|
|
|
pNew->cmd.autoCreated = pSql->cmd.autoCreated; // create table if not exists
|
|
if (TSDB_CODE_SUCCESS != tscAllocPayload(&pNew->cmd, TSDB_DEFAULT_PAYLOAD_SIZE)) {
|
|
tscError("%p malloc failed for payload to get meter meta", pSql);
|
|
free(pNew);
|
|
|
|
return TSDB_CODE_CLI_OUT_OF_MEMORY;
|
|
}
|
|
|
|
STableMetaInfo *pNewMeterMetaInfo = tscAddEmptyMetaInfo(pNewQueryInfo);
|
|
assert(pNew->cmd.numOfClause == 1 && pNewQueryInfo->numOfTables == 1);
|
|
|
|
strncpy(pNewMeterMetaInfo->name, pTableMetaInfo->name, tListLen(pNewMeterMetaInfo->name));
|
|
memcpy(pNew->cmd.payload, pSql->cmd.payload, TSDB_DEFAULT_PAYLOAD_SIZE); // tag information if table does not exists.
|
|
tscTrace("%p new pSqlObj:%p to get tableMeta", pSql, pNew);
|
|
|
|
pNew->fp = tscTableMetaCallBack;
|
|
pNew->param = pSql;
|
|
|
|
int32_t code = tscProcessSql(pNew);
|
|
if (code == TSDB_CODE_SUCCESS) {
|
|
code = TSDB_CODE_ACTION_IN_PROGRESS;
|
|
}
|
|
|
|
return code;
|
|
}
|
|
|
|
int32_t tscGetTableMeta(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo) {
|
|
assert(strlen(pTableMetaInfo->name) != 0);
|
|
|
|
// If this STableMetaInfo owns a table meta, release it first
|
|
if (pTableMetaInfo->pTableMeta != NULL) {
|
|
taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pTableMeta), false);
|
|
}
|
|
|
|
pTableMetaInfo->pTableMeta = (STableMeta *)taosCacheAcquireByName(tscCacheHandle, pTableMetaInfo->name);
|
|
if (pTableMetaInfo->pTableMeta != NULL) {
|
|
STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
|
|
tscTrace("%p retrieve table Meta from cache, the number of columns:%d, numOfTags:%d, %p", pSql, tinfo.numOfColumns,
|
|
tinfo.numOfTags, pTableMetaInfo->pTableMeta);
|
|
|
|
return TSDB_CODE_SUCCESS;
|
|
}
|
|
|
|
return getTableMetaFromMgmt(pSql, pTableMetaInfo);
|
|
}
|
|
|
|
int tscGetMeterMetaEx(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo, bool createIfNotExists) {
|
|
pSql->cmd.autoCreated = createIfNotExists;
|
|
return tscGetTableMeta(pSql, pTableMetaInfo);
|
|
}
|
|
|
|
/*
|
|
* in handling the renew metermeta problem during insertion,
|
|
*
|
|
* If the meter is created on demand during insertion, the routine usually waits for a short
|
|
* period to re-issue the getMeterMeta msg, in which makes a greater change that vnode has
|
|
* successfully created the corresponding table.
|
|
*/
|
|
static void tscWaitingForCreateTable(SSqlCmd *pCmd) {
|
|
if (pCmd->command == TSDB_SQL_INSERT) {
|
|
taosMsleep(50); // todo: global config
|
|
}
|
|
}
|
|
|
|
/**
|
|
* in renew metermeta, do not retrieve metadata in cache.
|
|
* @param pSql sql object
|
|
* @param tableId meter id
|
|
* @return status code
|
|
*/
|
|
int tscRenewMeterMeta(SSqlObj *pSql, char *tableId) {
|
|
int code = 0;
|
|
|
|
// handle table meta renew process
|
|
SSqlCmd *pCmd = &pSql->cmd;
|
|
|
|
SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
|
|
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
|
|
|
|
/*
|
|
* 1. only update the metermeta in force model metricmeta is not updated
|
|
* 2. if get metermeta failed, still get the metermeta
|
|
*/
|
|
if (pTableMetaInfo->pTableMeta == NULL || !tscQueryOnMetric(pCmd)) {
|
|
if (pTableMetaInfo->pTableMeta) {
|
|
tscTrace("%p update meter meta, old: numOfTags:%d, numOfCols:%d, uid:%" PRId64 ", addr:%p", pSql,
|
|
pTableMetaInfo->numOfTags, pCmd->numOfCols, pTableMetaInfo->pTableMeta->uid, pTableMetaInfo->pTableMeta);
|
|
}
|
|
|
|
tscWaitingForCreateTable(pCmd);
|
|
taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pTableMeta), true);
|
|
|
|
code = getTableMetaFromMgmt(pSql, pTableMetaInfo); // todo ??
|
|
} else {
|
|
tscTrace("%p metric query not update metric meta, numOfTags:%d, numOfCols:%d, uid:%" PRId64 ", addr:%p", pSql,
|
|
tscGetNumOfTags(pTableMetaInfo->pTableMeta), pCmd->numOfCols, pTableMetaInfo->pTableMeta->uid,
|
|
pTableMetaInfo->pTableMeta);
|
|
}
|
|
|
|
return code;
|
|
}
|
|
|
|
int tscGetSTableVgroupInfo(SSqlObj *pSql, int32_t clauseIndex) {
|
|
int code = TSDB_CODE_NETWORK_UNAVAIL;
|
|
SSqlCmd *pCmd = &pSql->cmd;
|
|
|
|
//the query condition is serialized into pCmd->payload, we need to rebuild key for stable meta info in cache.
|
|
// bool required = false;
|
|
|
|
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex);
|
|
if (pQueryInfo->pTableMetaInfo[0]->vgroupIdList != NULL) {
|
|
return TSDB_CODE_SUCCESS;
|
|
}
|
|
|
|
#if 0
|
|
|
|
for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
|
|
char tagstr[TSDB_MAX_TAGS_LEN + 1] = {0};
|
|
|
|
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, i);
|
|
tscGetMetricMetaCacheKey(pQueryInfo, tagstr, pTableMetaInfo->pTableMeta->uid);
|
|
|
|
// taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pMetricMeta), false);
|
|
|
|
SSuperTableMeta *ppMeta = (SSuperTableMeta *)taosCacheAcquireByName(tscCacheHandle, tagstr);
|
|
if (ppMeta == NULL) {
|
|
required = true;
|
|
break;
|
|
} else {
|
|
// pTableMetaInfo->pMetricMeta = ppMeta;
|
|
}
|
|
}
|
|
|
|
// all metricmeta for one clause are retrieved from cache, no need to retrieve metricmeta from management node
|
|
if (!required) {
|
|
return TSDB_CODE_SUCCESS;
|
|
}
|
|
#endif
|
|
|
|
SSqlObj *pNew = calloc(1, sizeof(SSqlObj));
|
|
pNew->pTscObj = pSql->pTscObj;
|
|
pNew->signature = pNew;
|
|
|
|
pNew->cmd.command = TSDB_SQL_STABLEVGROUP;
|
|
|
|
SQueryInfo *pNewQueryInfo = NULL;
|
|
if ((code = tscGetQueryInfoDetailSafely(&pNew->cmd, 0, &pNewQueryInfo)) != TSDB_CODE_SUCCESS) {
|
|
return code;
|
|
}
|
|
|
|
for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
|
|
STableMetaInfo *pMMInfo = tscGetMetaInfo(pQueryInfo, i);
|
|
|
|
STableMeta *pTableMeta = taosCacheAcquireByName(tscCacheHandle, pMMInfo->name);
|
|
tscAddMeterMetaInfo(pNewQueryInfo, pMMInfo->name, pTableMeta, NULL, pMMInfo->numOfTags, pMMInfo->tagColumnIndex);
|
|
}
|
|
|
|
if ((code = tscAllocPayload(&pNew->cmd, TSDB_DEFAULT_PAYLOAD_SIZE)) != TSDB_CODE_SUCCESS) {
|
|
tscFreeSqlObj(pNew);
|
|
return code;
|
|
}
|
|
|
|
tscTagCondCopy(&pNewQueryInfo->tagCond, &pQueryInfo->tagCond);
|
|
|
|
pNewQueryInfo->groupbyExpr = pQueryInfo->groupbyExpr;
|
|
pNewQueryInfo->numOfTables = pQueryInfo->numOfTables;
|
|
|
|
pNewQueryInfo->slimit = pQueryInfo->slimit;
|
|
pNewQueryInfo->order = pQueryInfo->order;
|
|
|
|
STagCond* pTagCond = &pNewQueryInfo->tagCond;
|
|
tscTrace("%p new sqlobj:%p info, numOfTables:%d, slimit:%" PRId64 ", soffset:%" PRId64 ", order:%d, tbname cond:%s",
|
|
pSql, pNew, pNewQueryInfo->numOfTables, pNewQueryInfo->slimit.limit, pNewQueryInfo->slimit.offset,
|
|
pNewQueryInfo->order.order, pTagCond->tbnameCond.cond)
|
|
|
|
// if (pSql->fp != NULL && pSql->pStream == NULL) {
|
|
// pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);
|
|
// tscFreeSubqueryInfo(pCmd);
|
|
// }
|
|
|
|
tscTrace("%p allocate new pSqlObj:%p to get stable vgroupInfo", pSql, pNew);
|
|
pNew->fp = tscTableMetaCallBack;
|
|
pNew->param = pSql;
|
|
code = tscProcessSql(pNew);
|
|
if (code == TSDB_CODE_SUCCESS) {
|
|
code = TSDB_CODE_ACTION_IN_PROGRESS;
|
|
}
|
|
|
|
return code;
|
|
}
|
|
|
|
void tscInitMsgsFp() {
|
|
tscBuildMsg[TSDB_SQL_SELECT] = tscBuildQueryMsg;
|
|
tscBuildMsg[TSDB_SQL_INSERT] = tscBuildSubmitMsg;
|
|
tscBuildMsg[TSDB_SQL_FETCH] = tscBuildRetrieveMsg;
|
|
|
|
tscBuildMsg[TSDB_SQL_CREATE_DB] = tscBuildCreateDbMsg;
|
|
tscBuildMsg[TSDB_SQL_CREATE_USER] = tscBuildUserMsg;
|
|
|
|
tscBuildMsg[TSDB_SQL_CREATE_ACCT] = tscBuildAcctMsg;
|
|
tscBuildMsg[TSDB_SQL_ALTER_ACCT] = tscBuildAcctMsg;
|
|
|
|
tscBuildMsg[TSDB_SQL_CREATE_TABLE] = tscBuildCreateTableMsg;
|
|
tscBuildMsg[TSDB_SQL_DROP_USER] = tscBuildDropUserMsg;
|
|
tscBuildMsg[TSDB_SQL_DROP_ACCT] = tscBuildDropAcctMsg;
|
|
tscBuildMsg[TSDB_SQL_DROP_DB] = tscBuildDropDbMsg;
|
|
tscBuildMsg[TSDB_SQL_DROP_TABLE] = tscBuildDropTableMsg;
|
|
tscBuildMsg[TSDB_SQL_ALTER_USER] = tscBuildUserMsg;
|
|
tscBuildMsg[TSDB_SQL_CREATE_DNODE] = tscBuildCreateDnodeMsg;
|
|
tscBuildMsg[TSDB_SQL_DROP_DNODE] = tscBuildDropDnodeMsg;
|
|
tscBuildMsg[TSDB_SQL_CFG_DNODE] = tscBuildCfgDnodeMsg;
|
|
tscBuildMsg[TSDB_SQL_ALTER_TABLE] = tscBuildAlterTableMsg;
|
|
tscBuildMsg[TSDB_SQL_ALTER_DB] = tscAlterDbMsg;
|
|
|
|
tscBuildMsg[TSDB_SQL_CONNECT] = tscBuildConnectMsg;
|
|
tscBuildMsg[TSDB_SQL_USE_DB] = tscBuildUseDbMsg;
|
|
tscBuildMsg[TSDB_SQL_META] = tscBuildTableMetaMsg;
|
|
tscBuildMsg[TSDB_SQL_STABLEVGROUP] = tscBuildSTableVgroupMsg;
|
|
tscBuildMsg[TSDB_SQL_MULTI_META] = tscBuildMultiMeterMetaMsg;
|
|
|
|
tscBuildMsg[TSDB_SQL_HB] = tscBuildHeartBeatMsg;
|
|
tscBuildMsg[TSDB_SQL_SHOW] = tscBuildShowMsg;
|
|
tscBuildMsg[TSDB_SQL_RETRIEVE] = tscBuildRetrieveFromMgmtMsg;
|
|
tscBuildMsg[TSDB_SQL_KILL_QUERY] = tscBuildKillMsg;
|
|
tscBuildMsg[TSDB_SQL_KILL_STREAM] = tscBuildKillMsg;
|
|
tscBuildMsg[TSDB_SQL_KILL_CONNECTION] = tscBuildKillMsg;
|
|
|
|
tscProcessMsgRsp[TSDB_SQL_SELECT] = tscProcessQueryRsp;
|
|
tscProcessMsgRsp[TSDB_SQL_FETCH] = tscProcessRetrieveRspFromVnode;
|
|
|
|
tscProcessMsgRsp[TSDB_SQL_DROP_DB] = tscProcessDropDbRsp;
|
|
tscProcessMsgRsp[TSDB_SQL_DROP_TABLE] = tscProcessDropTableRsp;
|
|
tscProcessMsgRsp[TSDB_SQL_CONNECT] = tscProcessConnectRsp;
|
|
tscProcessMsgRsp[TSDB_SQL_USE_DB] = tscProcessUseDbRsp;
|
|
tscProcessMsgRsp[TSDB_SQL_META] = tscProcessTableMetaRsp;
|
|
tscProcessMsgRsp[TSDB_SQL_STABLEVGROUP] = tscProcessSTableVgroupRsp;
|
|
tscProcessMsgRsp[TSDB_SQL_MULTI_META] = tscProcessMultiMeterMetaRsp;
|
|
|
|
tscProcessMsgRsp[TSDB_SQL_SHOW] = tscProcessShowRsp;
|
|
tscProcessMsgRsp[TSDB_SQL_RETRIEVE] = tscProcessRetrieveRspFromVnode; // rsp handled by same function.
|
|
tscProcessMsgRsp[TSDB_SQL_DESCRIBE_TABLE] = tscProcessDescribeTableRsp;
|
|
|
|
tscProcessMsgRsp[TSDB_SQL_RETRIEVE_TAGS] = tscProcessTagRetrieveRsp;
|
|
tscProcessMsgRsp[TSDB_SQL_CURRENT_DB] = tscProcessTagRetrieveRsp;
|
|
tscProcessMsgRsp[TSDB_SQL_CURRENT_USER] = tscProcessTagRetrieveRsp;
|
|
tscProcessMsgRsp[TSDB_SQL_SERV_VERSION] = tscProcessTagRetrieveRsp;
|
|
tscProcessMsgRsp[TSDB_SQL_CLI_VERSION] = tscProcessTagRetrieveRsp;
|
|
tscProcessMsgRsp[TSDB_SQL_SERV_STATUS] = tscProcessTagRetrieveRsp;
|
|
|
|
tscProcessMsgRsp[TSDB_SQL_RETRIEVE_EMPTY_RESULT] = tscProcessEmptyResultRsp;
|
|
|
|
tscProcessMsgRsp[TSDB_SQL_RETRIEVE_METRIC] = tscProcessRetrieveMetricRsp;
|
|
|
|
tscProcessMsgRsp[TSDB_SQL_ALTER_TABLE] = tscProcessAlterTableMsgRsp;
|
|
tscProcessMsgRsp[TSDB_SQL_ALTER_DB] = tscProcessAlterDbMsgRsp;
|
|
|
|
tscKeepConn[TSDB_SQL_SHOW] = 1;
|
|
tscKeepConn[TSDB_SQL_RETRIEVE] = 1;
|
|
tscKeepConn[TSDB_SQL_SELECT] = 1;
|
|
tscKeepConn[TSDB_SQL_FETCH] = 1;
|
|
tscKeepConn[TSDB_SQL_HB] = 1;
|
|
}
|