fix bugs #436
This commit is contained in:
parent
b3cb1e20f9
commit
920edd131d
|
@ -14,14 +14,13 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include <arpa/inet.h>
|
#include <arpa/inet.h>
|
||||||
#include <mgmt.h>
|
|
||||||
#include <taosmsg.h>
|
|
||||||
#include "taosmsg.h"
|
#include "taosmsg.h"
|
||||||
|
|
||||||
#include "dnodeSystem.h"
|
#include "dnodeSystem.h"
|
||||||
#include "mgmt.h"
|
#include "mgmt.h"
|
||||||
#include "mgmtProfile.h"
|
#include "mgmtProfile.h"
|
||||||
#include "tlog.h"
|
#include "tlog.h"
|
||||||
|
|
||||||
#pragma GCC diagnostic ignored "-Wint-conversion"
|
#pragma GCC diagnostic ignored "-Wint-conversion"
|
||||||
#pragma GCC diagnostic ignored "-Wpointer-sign"
|
#pragma GCC diagnostic ignored "-Wpointer-sign"
|
||||||
|
|
||||||
|
@ -119,6 +118,27 @@ static char *mgmtAllocMsg(SConnObj *pConn, int32_t size, char **pMsg, STaosRsp *
|
||||||
return pStart;
|
return pStart;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* check if we need to add mgmtProcessMeterMetaMsg into tranQueue, which will be executed one-by-one.
|
||||||
|
*
|
||||||
|
* @param pMsg
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
bool mgmtCheckMeterMetaMsgType(char *pMsg) {
|
||||||
|
SMeterInfoMsg *pInfo = (SMeterInfoMsg *)pMsg;
|
||||||
|
|
||||||
|
int16_t autoCreate = htons(pInfo->createFlag);
|
||||||
|
STabObj *pMeterObj = mgmtGetMeter(pInfo->meterId);
|
||||||
|
|
||||||
|
// If table does not exists and autoCreate flag is set, we add the handler into another task queue, namely tranQueue
|
||||||
|
bool addIntoTranQueue = (pMeterObj == NULL && autoCreate == 1);
|
||||||
|
if (addIntoTranQueue) {
|
||||||
|
mTrace("meter:%s auto created task added", pInfo->meterId);
|
||||||
|
}
|
||||||
|
|
||||||
|
return addIntoTranQueue;
|
||||||
|
}
|
||||||
|
|
||||||
int mgmtProcessMeterMetaMsg(char *pMsg, int msgLen, SConnObj *pConn) {
|
int mgmtProcessMeterMetaMsg(char *pMsg, int msgLen, SConnObj *pConn) {
|
||||||
SMeterInfoMsg *pInfo = (SMeterInfoMsg *)pMsg;
|
SMeterInfoMsg *pInfo = (SMeterInfoMsg *)pMsg;
|
||||||
STabObj * pMeterObj = NULL;
|
STabObj * pMeterObj = NULL;
|
||||||
|
@ -133,7 +153,8 @@ int mgmtProcessMeterMetaMsg(char *pMsg, int msgLen, SConnObj *pConn) {
|
||||||
int size = sizeof(STaosHeader) + sizeof(STaosRsp) + sizeof(SMeterMeta) + sizeof(SSchema) * TSDB_MAX_COLUMNS +
|
int size = sizeof(STaosHeader) + sizeof(STaosRsp) + sizeof(SMeterMeta) + sizeof(SSchema) * TSDB_MAX_COLUMNS +
|
||||||
sizeof(SSchema) * TSDB_MAX_TAGS + TSDB_MAX_TAGS_LEN + TSDB_EXTRA_PAYLOAD_SIZE;
|
sizeof(SSchema) * TSDB_MAX_TAGS + TSDB_MAX_TAGS_LEN + TSDB_EXTRA_PAYLOAD_SIZE;
|
||||||
|
|
||||||
if ((pConn->pDb != NULL && pConn->pDb->dropStatus != TSDB_DB_STATUS_READY) || pConn->pDb == NULL) {
|
if (pConn->pDb == NULL || (pConn->pDb != NULL && pConn->pDb->dropStatus != TSDB_DB_STATUS_READY)) {
|
||||||
|
// todo handle failed to allocate msg buffer
|
||||||
if ((pStart = mgmtAllocMsg(pConn, size, &pMsg, &pRsp)) == NULL) {
|
if ((pStart = mgmtAllocMsg(pConn, size, &pMsg, &pRsp)) == NULL) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -145,14 +166,24 @@ int mgmtProcessMeterMetaMsg(char *pMsg, int msgLen, SConnObj *pConn) {
|
||||||
}
|
}
|
||||||
|
|
||||||
pMeterObj = mgmtGetMeter(pInfo->meterId);
|
pMeterObj = mgmtGetMeter(pInfo->meterId);
|
||||||
|
|
||||||
|
// on demand create table from super table if meter does not exists
|
||||||
if (pMeterObj == NULL && pInfo->createFlag == 1) {
|
if (pMeterObj == NULL && pInfo->createFlag == 1) {
|
||||||
// create the meter objects if not exists
|
|
||||||
SCreateTableMsg *pCreateMsg = calloc(1, sizeof(SCreateTableMsg) + sizeof(STagData));
|
SCreateTableMsg *pCreateMsg = calloc(1, sizeof(SCreateTableMsg) + sizeof(STagData));
|
||||||
|
if (pCreateMsg == NULL) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
memcpy(pCreateMsg->schema, pInfo->tags, sizeof(STagData));
|
memcpy(pCreateMsg->schema, pInfo->tags, sizeof(STagData));
|
||||||
strcpy(pCreateMsg->meterId, pInfo->meterId);
|
strcpy(pCreateMsg->meterId, pInfo->meterId);
|
||||||
// todo handle if not master mnode
|
|
||||||
int32_t code = mgmtCreateMeter(pConn->pDb, pCreateMsg);
|
int32_t code = mgmtCreateMeter(pConn->pDb, pCreateMsg);
|
||||||
mTrace("meter:%s is automatically created by %s, code:%d", pCreateMsg->meterId, pConn->pUser->user, code);
|
|
||||||
|
char stableName[TSDB_METER_ID_LEN] = {0};
|
||||||
|
strncpy(stableName, pInfo->tags, TSDB_METER_ID_LEN);
|
||||||
|
mTrace("meter:%s is automatically created by %s from %s, code:%d", pCreateMsg->meterId, pConn->pUser->user,
|
||||||
|
stableName, code);
|
||||||
|
|
||||||
tfree(pCreateMsg);
|
tfree(pCreateMsg);
|
||||||
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
@ -180,7 +211,7 @@ int mgmtProcessMeterMetaMsg(char *pMsg, int msgLen, SConnObj *pConn) {
|
||||||
pRsp->code = TSDB_CODE_DB_NOT_SELECTED;
|
pRsp->code = TSDB_CODE_DB_NOT_SELECTED;
|
||||||
pMsg++;
|
pMsg++;
|
||||||
} else {
|
} else {
|
||||||
mTrace("meter:%s, meta is retrieved from:%s", pInfo->meterId, pMeterObj->meterId);
|
mTrace("%s, uid:%lld meter meta is retrieved", pInfo->meterId, pMeterObj->uid);
|
||||||
pRsp->code = 0;
|
pRsp->code = 0;
|
||||||
pMsg += sizeof(STaosRsp);
|
pMsg += sizeof(STaosRsp);
|
||||||
*pMsg = TSDB_IE_TYPE_META;
|
*pMsg = TSDB_IE_TYPE_META;
|
||||||
|
@ -199,8 +230,7 @@ int mgmtProcessMeterMetaMsg(char *pMsg, int msgLen, SConnObj *pConn) {
|
||||||
pMeta->meterType = htons(pMeterObj->meterType);
|
pMeta->meterType = htons(pMeterObj->meterType);
|
||||||
|
|
||||||
pMsg += sizeof(SMeterMeta);
|
pMsg += sizeof(SMeterMeta);
|
||||||
pSchema = (SSchema *)pMsg; // schema locates at the end of SMeterMeta
|
pSchema = (SSchema *)pMsg; // schema locates at the end of SMeterMeta struct
|
||||||
// struct
|
|
||||||
|
|
||||||
if (mgmtMeterCreateFromMetric(pMeterObj)) {
|
if (mgmtMeterCreateFromMetric(pMeterObj)) {
|
||||||
assert(pMeterObj->numOfTags == 0);
|
assert(pMeterObj->numOfTags == 0);
|
||||||
|
@ -208,8 +238,7 @@ int mgmtProcessMeterMetaMsg(char *pMsg, int msgLen, SConnObj *pConn) {
|
||||||
STabObj *pMetric = mgmtGetMeter(pMeterObj->pTagData);
|
STabObj *pMetric = mgmtGetMeter(pMeterObj->pTagData);
|
||||||
uint32_t numOfTotalCols = (uint32_t)pMetric->numOfTags + pMetric->numOfColumns;
|
uint32_t numOfTotalCols = (uint32_t)pMetric->numOfTags + pMetric->numOfColumns;
|
||||||
|
|
||||||
pMeta->numOfTags = htons(pMetric->numOfTags); // update the numOfTags
|
pMeta->numOfTags = htons(pMetric->numOfTags); // update the numOfTags info
|
||||||
// info
|
|
||||||
mgmtSetSchemaFromMeters(pSchema, pMetric, numOfTotalCols);
|
mgmtSetSchemaFromMeters(pSchema, pMetric, numOfTotalCols);
|
||||||
pMsg += numOfTotalCols * sizeof(SSchema);
|
pMsg += numOfTotalCols * sizeof(SSchema);
|
||||||
|
|
||||||
|
@ -233,7 +262,10 @@ int mgmtProcessMeterMetaMsg(char *pMsg, int msgLen, SConnObj *pConn) {
|
||||||
pRsp->code = TSDB_CODE_INVALID_TABLE;
|
pRsp->code = TSDB_CODE_INVALID_TABLE;
|
||||||
goto _exit_code;
|
goto _exit_code;
|
||||||
}
|
}
|
||||||
pMeta->vpeerDesc[0].vnode = htonl(pVgroup->vnodeGid[0].vnode);
|
for (int i = 0; i < TSDB_VNODES_SUPPORT; ++i) {
|
||||||
|
pMeta->vpeerDesc[i].ip = pVgroup->vnodeGid[i].publicIp;
|
||||||
|
pMeta->vpeerDesc[i].vnode = htonl(pVgroup->vnodeGid[i].vnode);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -934,12 +966,14 @@ void *mgmtProcessMsgFromShell(char *msg, void *ahandle, void *thandle) {
|
||||||
|
|
||||||
char *cont = (char *)pMsg->content + sizeof(SMgmtHead);
|
char *cont = (char *)pMsg->content + sizeof(SMgmtHead);
|
||||||
int contLen = pMsg->msgLen - sizeof(SIntMsg) - sizeof(SMgmtHead);
|
int contLen = pMsg->msgLen - sizeof(SIntMsg) - sizeof(SMgmtHead);
|
||||||
if (pMsg->msgType == TSDB_MSG_TYPE_METERINFO || pMsg->msgType == TSDB_MSG_TYPE_METRIC_META ||
|
|
||||||
pMsg->msgType == TSDB_MSG_TYPE_RETRIEVE || pMsg->msgType == TSDB_MSG_TYPE_SHOW) {
|
// read-only request can be executed concurrently
|
||||||
|
if ((pMsg->msgType == TSDB_MSG_TYPE_METERINFO && (!mgmtCheckMeterMetaMsgType(cont))) ||
|
||||||
|
pMsg->msgType == TSDB_MSG_TYPE_METRIC_META || pMsg->msgType == TSDB_MSG_TYPE_RETRIEVE ||
|
||||||
|
pMsg->msgType == TSDB_MSG_TYPE_SHOW) {
|
||||||
(*mgmtProcessShellMsg[pMsg->msgType])(cont, contLen, pConn);
|
(*mgmtProcessShellMsg[pMsg->msgType])(cont, contLen, pConn);
|
||||||
} else {
|
} else {
|
||||||
if (mgmtProcessShellMsg[pMsg->msgType]) {
|
if (mgmtProcessShellMsg[pMsg->msgType]) {
|
||||||
// TODO : put the msg in tran queue
|
|
||||||
SSchedMsg schedMsg;
|
SSchedMsg schedMsg;
|
||||||
schedMsg.msg = malloc(pMsg->msgLen); // Message to deal with
|
schedMsg.msg = malloc(pMsg->msgLen); // Message to deal with
|
||||||
memcpy(schedMsg.msg, pMsg, pMsg->msgLen);
|
memcpy(schedMsg.msg, pMsg, pMsg->msgLen);
|
||||||
|
|
Loading…
Reference in New Issue