fix the bugs in issue #932. [tbase-1353]

This commit is contained in:
hjxilinx 2019-12-17 13:46:31 +08:00
parent 14be179081
commit 05ce45e76e
6 changed files with 91 additions and 64 deletions

View File

@ -1482,6 +1482,46 @@ static int32_t tscEstimateQueryMsgSize(SSqlCmd *pCmd) {
return size; return size;
} }
static char* doSerializeTableInfo(SSqlObj* pSql, int32_t numOfMeters, int32_t vnodeId, char* pMsg) {
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0);
SMeterMeta * pMeterMeta = pMeterMetaInfo->pMeterMeta;
SMetricMeta *pMetricMeta = pMeterMetaInfo->pMetricMeta;
tscTrace("%p vid:%d, query on %d meters", pSql, htons(vnodeId), numOfMeters);
if (UTIL_METER_IS_NOMRAL_METER(pMeterMetaInfo)) {
#ifdef _DEBUG_VIEW
tscTrace("%p sid:%d, uid:%lld", pSql, pMeterMetaInfo->pMeterMeta->sid, pMeterMetaInfo->pMeterMeta->uid);
#endif
SMeterSidExtInfo *pMeterInfo = (SMeterSidExtInfo *)pMsg;
pMeterInfo->sid = htonl(pMeterMeta->sid);
pMeterInfo->uid = htobe64(pMeterMeta->uid);
pMsg += sizeof(SMeterSidExtInfo);
} else {
SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pMeterMetaInfo->vnodeIndex);
for (int32_t i = 0; i < numOfMeters; ++i) {
SMeterSidExtInfo *pMeterInfo = (SMeterSidExtInfo *)pMsg;
SMeterSidExtInfo *pQueryMeterInfo = tscGetMeterSidInfo(pVnodeSidList, i);
pMeterInfo->sid = htonl(pQueryMeterInfo->sid);
pMeterInfo->uid = htobe64(pQueryMeterInfo->uid);
pMsg += sizeof(SMeterSidExtInfo);
memcpy(pMsg, pQueryMeterInfo->tags, pMetricMeta->tagLen);
pMsg += pMetricMeta->tagLen;
#ifdef _DEBUG_VIEW
tscTrace("%p sid:%d, uid:%lld", pSql, pQueryMeterInfo->sid, pQueryMeterInfo->uid);
#endif
}
}
return pMsg;
}
int tscBuildQueryMsg(SSqlObj *pSql) { int tscBuildQueryMsg(SSqlObj *pSql) {
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
@ -1512,7 +1552,7 @@ int tscBuildQueryMsg(SSqlObj *pSql) {
pQueryMsg->vnode = htons(pMeterMeta->vpeerDesc[pMeterMeta->index].vnode); pQueryMsg->vnode = htons(pMeterMeta->vpeerDesc[pMeterMeta->index].vnode);
pQueryMsg->uid = pMeterMeta->uid; pQueryMsg->uid = pMeterMeta->uid;
pQueryMsg->numOfTagsCols = 0; pQueryMsg->numOfTagsCols = 0;
} else { // query on metric } else { // query on super table
if (pMeterMetaInfo->vnodeIndex < 0) { if (pMeterMetaInfo->vnodeIndex < 0) {
tscError("%p error vnodeIdx:%d", pSql, pMeterMetaInfo->vnodeIndex); tscError("%p error vnodeIdx:%d", pSql, pMeterMetaInfo->vnodeIndex);
return -1; return -1;
@ -1699,34 +1739,8 @@ int tscBuildQueryMsg(SSqlObj *pSql) {
pQueryMsg->colNameLen = htonl(len); pQueryMsg->colNameLen = htonl(len);
// set sids list // serialize the table info (sid, uid, tags)
tscTrace("%p vid:%d, query on %d meters", pSql, htons(pQueryMsg->vnode), numOfMeters); pMsg = doSerializeTableInfo(pSql, numOfMeters, htons(pQueryMsg->vnode), pMsg);
if (UTIL_METER_IS_NOMRAL_METER(pMeterMetaInfo)) {
#ifdef _DEBUG_VIEW
tscTrace("%p %d", pSql, pMeterMetaInfo->pMeterMeta->sid);
#endif
SMeterSidExtInfo *pSMeterTagInfo = (SMeterSidExtInfo *)pMsg;
pSMeterTagInfo->sid = htonl(pMeterMeta->sid);
pMsg += sizeof(SMeterSidExtInfo);
} else {
SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pMeterMetaInfo->vnodeIndex);
for (int32_t i = 0; i < numOfMeters; ++i) {
SMeterSidExtInfo *pMeterTagInfo = (SMeterSidExtInfo *)pMsg;
SMeterSidExtInfo *pQueryMeterInfo = tscGetMeterSidInfo(pVnodeSidList, i);
pMeterTagInfo->sid = htonl(pQueryMeterInfo->sid);
pMsg += sizeof(SMeterSidExtInfo);
#ifdef _DEBUG_VIEW
tscTrace("%p %d", pSql, pQueryMeterInfo->sid);
#endif
memcpy(pMsg, pQueryMeterInfo->tags, pMetricMeta->tagLen);
pMsg += pMetricMeta->tagLen;
}
}
// only include the required tag column schema. If a tag is not required, it won't be sent to vnode // only include the required tag column schema. If a tag is not required, it won't be sent to vnode
if (pMeterMetaInfo->numOfTags > 0) { if (pMeterMetaInfo->numOfTags > 0) {
@ -3226,44 +3240,47 @@ int tscProcessMetricMetaRsp(SSqlObj *pSql) {
size += pMeta->numOfVnodes * sizeof(SVnodeSidList *) + pMeta->numOfMeters * sizeof(SMeterSidExtInfo *); size += pMeta->numOfVnodes * sizeof(SVnodeSidList *) + pMeta->numOfMeters * sizeof(SMeterSidExtInfo *);
char *pStr = calloc(1, size); char *pBuf = calloc(1, size);
if (pStr == NULL) { if (pBuf == NULL) {
pSql->res.code = TSDB_CODE_CLI_OUT_OF_MEMORY; pSql->res.code = TSDB_CODE_CLI_OUT_OF_MEMORY;
goto _error_clean; goto _error_clean;
} }
SMetricMeta *pNewMetricMeta = (SMetricMeta *)pStr; SMetricMeta *pNewMetricMeta = (SMetricMeta *)pBuf;
metricMetaList[k] = pNewMetricMeta; metricMetaList[k] = pNewMetricMeta;
pNewMetricMeta->numOfMeters = pMeta->numOfMeters; pNewMetricMeta->numOfMeters = pMeta->numOfMeters;
pNewMetricMeta->numOfVnodes = pMeta->numOfVnodes; pNewMetricMeta->numOfVnodes = pMeta->numOfVnodes;
pNewMetricMeta->tagLen = pMeta->tagLen; pNewMetricMeta->tagLen = pMeta->tagLen;
pStr = pStr + sizeof(SMetricMeta) + pNewMetricMeta->numOfVnodes * sizeof(SVnodeSidList *); pBuf = pBuf + sizeof(SMetricMeta) + pNewMetricMeta->numOfVnodes * sizeof(SVnodeSidList *);
for (int32_t i = 0; i < pMeta->numOfVnodes; ++i) { for (int32_t i = 0; i < pMeta->numOfVnodes; ++i) {
SVnodeSidList *pSidLists = (SVnodeSidList *)rsp; SVnodeSidList *pSidLists = (SVnodeSidList *)rsp;
memcpy(pStr, pSidLists, sizeof(SVnodeSidList)); memcpy(pBuf, pSidLists, sizeof(SVnodeSidList));
pNewMetricMeta->list[i] = pStr - (char *)pNewMetricMeta; // offset value pNewMetricMeta->list[i] = pBuf - (char *)pNewMetricMeta; // offset value
SVnodeSidList *pLists = (SVnodeSidList *)pStr; SVnodeSidList *pLists = (SVnodeSidList *)pBuf;
tscTrace("%p metricmeta:vid:%d,numOfMeters:%d", pSql, i, pLists->numOfSids); tscTrace("%p metricmeta:vid:%d,numOfMeters:%d", pSql, i, pLists->numOfSids);
pStr += sizeof(SVnodeSidList) + sizeof(SMeterSidExtInfo *) * pSidLists->numOfSids; pBuf += sizeof(SVnodeSidList) + sizeof(SMeterSidExtInfo *) * pSidLists->numOfSids;
rsp += sizeof(SVnodeSidList); rsp += sizeof(SVnodeSidList);
size_t sidSize = sizeof(SMeterSidExtInfo) + pNewMetricMeta->tagLen; size_t elemSize = sizeof(SMeterSidExtInfo) + pNewMetricMeta->tagLen;
for (int32_t j = 0; j < pSidLists->numOfSids; ++j) { for (int32_t j = 0; j < pSidLists->numOfSids; ++j) {
pLists->pSidExtInfoList[j] = pStr - (char *)pLists; pLists->pSidExtInfoList[j] = pBuf - (char *)pLists;
memcpy(pStr, rsp, sidSize); memcpy(pBuf, rsp, elemSize);
rsp += sidSize; ((SMeterSidExtInfo*) pBuf)->uid = htobe64(((SMeterSidExtInfo*) pBuf)->uid);
pStr += sidSize; ((SMeterSidExtInfo*) pBuf)->sid = htonl(((SMeterSidExtInfo*) pBuf)->sid);
rsp += elemSize;
pBuf += elemSize;
} }
} }
sizes[k] = pStr - (char *)pNewMetricMeta; sizes[k] = pBuf - (char *)pNewMetricMeta;
} }
for (int32_t i = 0; i < num; ++i) { for (int32_t i = 0; i < num; ++i) {

View File

@ -136,8 +136,9 @@ extern "C" {
#define TSDB_CODE_INVALID_TABLE_ID 115 #define TSDB_CODE_INVALID_TABLE_ID 115
#define TSDB_CODE_INVALID_VNODE_STATUS 116 #define TSDB_CODE_INVALID_VNODE_STATUS 116
#define TSDB_CODE_FAILED_TO_LOCK_RESOURCES 117 #define TSDB_CODE_FAILED_TO_LOCK_RESOURCES 117
#define TSDB_CODE_TABLE_ID_MISMATCH 118
#define TSDB_CODE_MAX_ERROR_CODE 118 #define TSDB_CODE_MAX_ERROR_CODE 119
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -487,7 +487,7 @@ typedef struct SColumnInfo {
*/ */
typedef struct SMeterSidExtInfo { typedef struct SMeterSidExtInfo {
int32_t sid; int32_t sid;
void * pObj; int64_t uid;
char tags[]; char tags[];
} SMeterSidExtInfo; } SMeterSidExtInfo;
@ -724,9 +724,7 @@ typedef struct {
int32_t numOfMeters; int32_t numOfMeters;
int32_t join; int32_t join;
int32_t joinCondLen; // for join condition int32_t joinCondLen; // for join condition
int32_t metaElem[TSDB_MAX_JOIN_TABLE_NUM]; int32_t metaElem[TSDB_MAX_JOIN_TABLE_NUM];
} SMetricMetaMsg; } SMetricMetaMsg;
typedef struct { typedef struct {

View File

@ -238,8 +238,9 @@ char *tsError[] = {"success",
"only super table has metric meta info", "only super table has metric meta info",
"tags value not unique for join", "tags value not unique for join",
"invalid submit message", "invalid submit message",
"not active table(not created yet or dropped already)", //114 "not active table(not created yet or dropped already)",
"invalid table id", "invalid table id", // 115
"invalid vnode status", //116 "invalid vnode status",
"failed to lock resources", "failed to lock resources",
"table id/uid mismatch", // 118
}; };

View File

@ -1097,10 +1097,12 @@ int32_t vnodeConvertQueryMeterMsg(SQueryMeterMsg *pQueryMsg) {
pSids[0] = (SMeterSidExtInfo *)pMsg; pSids[0] = (SMeterSidExtInfo *)pMsg;
pSids[0]->sid = htonl(pSids[0]->sid); pSids[0]->sid = htonl(pSids[0]->sid);
pSids[0]->uid = htobe64(pSids[0]->uid);
for (int32_t j = 1; j < pQueryMsg->numOfSids; ++j) { for (int32_t j = 1; j < pQueryMsg->numOfSids; ++j) {
pSids[j] = (SMeterSidExtInfo *)((char *)pSids[j - 1] + sizeof(SMeterSidExtInfo) + pQueryMsg->tagLength); pSids[j] = (SMeterSidExtInfo *)((char *)pSids[j - 1] + sizeof(SMeterSidExtInfo) + pQueryMsg->tagLength);
pSids[j]->sid = htonl(pSids[j]->sid); pSids[j]->sid = htonl(pSids[j]->sid);
pSids[j]->uid = htobe64(pSids[j]->uid);
} }
pMsg = (char *)pSids[pQueryMsg->numOfSids - 1]; pMsg = (char *)pSids[pQueryMsg->numOfSids - 1];

View File

@ -28,7 +28,7 @@
#include "vnodeRead.h" #include "vnodeRead.h"
#include "vnodeUtil.h" #include "vnodeUtil.h"
#include "vnodeStore.h" #include "vnodeStore.h"
#include "tstatus.h" #include "vnodeStatus.h"
extern int tsMaxQueues; extern int tsMaxQueues;
@ -297,7 +297,7 @@ int vnodeProcessQueryRequest(char *pMsg, int msgLen, SShellObj *pObj) {
} }
if (pQueryMsg->vnode >= TSDB_MAX_VNODES || pQueryMsg->vnode < 0) { if (pQueryMsg->vnode >= TSDB_MAX_VNODES || pQueryMsg->vnode < 0) {
dTrace("qmsg:%p,vid:%d is out of range", pQueryMsg, pQueryMsg->vnode); dError("qmsg:%p,vid:%d is out of range", pQueryMsg, pQueryMsg->vnode);
code = TSDB_CODE_INVALID_TABLE_ID; code = TSDB_CODE_INVALID_TABLE_ID;
goto _query_over; goto _query_over;
} }
@ -312,31 +312,39 @@ int vnodeProcessQueryRequest(char *pMsg, int msgLen, SShellObj *pObj) {
} }
if (!(pVnode->accessState & TSDB_VN_READ_ACCCESS)) { if (!(pVnode->accessState & TSDB_VN_READ_ACCCESS)) {
dError("qmsg:%p,vid:%d access not allowed", pQueryMsg, pQueryMsg->vnode);
code = TSDB_CODE_NO_READ_ACCESS; code = TSDB_CODE_NO_READ_ACCESS;
goto _query_over; goto _query_over;
} }
if (pQueryMsg->pSidExtInfo == 0) {
dTrace("qmsg:%p,SQueryMeterMsg wrong format", pQueryMsg);
code = TSDB_CODE_INVALID_QUERY_MSG;
goto _query_over;
}
if (pVnode->meterList == NULL) { if (pVnode->meterList == NULL) {
dError("qmsg:%p,vid:%d has been closed", pQueryMsg, pQueryMsg->vnode); dError("qmsg:%p,vid:%d has been closed", pQueryMsg, pQueryMsg->vnode);
code = TSDB_CODE_NOT_ACTIVE_VNODE; code = TSDB_CODE_NOT_ACTIVE_VNODE;
goto _query_over; goto _query_over;
} }
if (pQueryMsg->pSidExtInfo == 0) {
dError("qmsg:%p,SQueryMeterMsg wrong format", pQueryMsg);
code = TSDB_CODE_INVALID_QUERY_MSG;
goto _query_over;
}
pSids = (SMeterSidExtInfo **)pQueryMsg->pSidExtInfo; pSids = (SMeterSidExtInfo **)pQueryMsg->pSidExtInfo;
for (int32_t i = 0; i < pQueryMsg->numOfSids; ++i) { for (int32_t i = 0; i < pQueryMsg->numOfSids; ++i) {
if (pSids[i]->sid >= pVnode->cfg.maxSessions || pSids[i]->sid < 0) { if (pSids[i]->sid >= pVnode->cfg.maxSessions || pSids[i]->sid < 0) {
dTrace("qmsg:%p sid:%d is out of range, valid range:[%d,%d]", pQueryMsg, pSids[i]->sid, 0, dError("qmsg:%p sid:%d out of range, valid range:[%d,%d]", pQueryMsg, pSids[i]->sid, 0, pVnode->cfg.maxSessions);
pVnode->cfg.maxSessions);
code = TSDB_CODE_INVALID_TABLE_ID; code = TSDB_CODE_INVALID_TABLE_ID;
goto _query_over; goto _query_over;
} }
SMeterObj* pMeterObj = pVnode->meterList[pSids[i]->sid];
if (pMeterObj->uid != pSids[i]->uid || pMeterObj->sid != pSids[i]->sid) { // uid/sid not match, error in query msg
dError("qmsg:%p sid/uid mismatch, vid:%d sid:%d id:%s uid:%" ", in msg sid:%d, uid:%lld", pQueryMsg,
pQueryMsg->vnode, pMeterObj->sid, pMeterObj->meterId, pMeterObj->uid, pSids[i]->sid, pSids[i]->uid);
code = TSDB_CODE_TABLE_ID_MISMATCH;
goto _query_over;
}
} }
// todo optimize for single table query process // todo optimize for single table query process