Merge pull request #933 from taosdata/feature/liaohj

Feature/liaohj
This commit is contained in:
slguan 2019-12-17 15:22:05 +08:00 committed by GitHub
commit ef6df5093d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
37 changed files with 511 additions and 377 deletions

View File

@ -1957,7 +1957,8 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, int32_t colIdx, tSQLExprItem* pItem
}
SColumnIndex index = COLUMN_INDEX_INITIALIZER;
if (getColumnIndexByNameEx(&pParamElem->pNode->colInfo, pCmd, &index) != TSDB_CODE_SUCCESS) {
if ((getColumnIndexByNameEx(&pParamElem->pNode->colInfo, pCmd, &index) != TSDB_CODE_SUCCESS) ||
index.columnIndex == TSDB_TBNAME_COLUMN_INDEX) {
return invalidSqlErrMsg(pCmd, msg3);
}
@ -1966,7 +1967,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, int32_t colIdx, tSQLExprItem* pItem
SSchema* pSchema = tsGetColumnSchema(pMeterMetaInfo->pMeterMeta, index.columnIndex);
int16_t colType = pSchema->type;
if (colType == TSDB_DATA_TYPE_BOOL || colType >= TSDB_DATA_TYPE_BINARY) {
if (colType <= TSDB_DATA_TYPE_BOOL || colType >= TSDB_DATA_TYPE_BINARY) {
return invalidSqlErrMsg(pCmd, msg1);
}
@ -2336,7 +2337,7 @@ static int32_t getMeterIndex(SSQLToken* pTableToken, SSqlCmd* pCmd, SColumnIndex
for (int32_t i = 0; i < pCmd->numOfTables; ++i) {
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, i);
extractMeterName(pMeterMetaInfo->name, tableName);
extractTableName(pMeterMetaInfo->name, tableName);
if (strncasecmp(tableName, pTableToken->z, pTableToken->n) == 0 && strlen(tableName) == pTableToken->n) {
pIndex->tableIndex = i;
@ -5468,15 +5469,16 @@ static int32_t doAddGroupbyColumnsOnDemand(SSqlCmd* pCmd) {
int32_t doFunctionsCompatibleCheck(SSqlObj* pSql) {
const char* msg1 = "functions/columns not allowed in group by query";
const char* msg2 = "projection query on columns not allowed";
const char* msg3 = "group by not allowed on projection query";
const char* msg5 = "retrieve tags not compatible with group by or interval query";
const char* msg4 = "retrieve tags not compatible with group by or interval query";
SSqlCmd* pCmd = &pSql->cmd;
// only retrieve tags, group by is not supportted
if (pCmd->command == TSDB_SQL_RETRIEVE_TAGS) {
if (pCmd->groupbyExpr.numOfGroupCols > 0 || pCmd->nAggTimeInterval > 0) {
return invalidSqlErrMsg(pCmd, msg5);
return invalidSqlErrMsg(pCmd, msg4);
} else {
return TSDB_CODE_SUCCESS;
}
@ -5509,7 +5511,7 @@ int32_t doFunctionsCompatibleCheck(SSqlObj* pSql) {
}
if (!qualified) {
return TSDB_CODE_INVALID_SQL;
return invalidSqlErrMsg(pCmd, msg2);
}
}

View File

@ -131,35 +131,39 @@ bool tsMeterMetaIdentical(SMeterMeta* p1, SMeterMeta* p2) {
}
// todo refactor
static FORCE_INLINE char* skipSegments(char* input, char delimiter, int32_t num) {
static FORCE_INLINE char* skipSegments(char* input, char delim, int32_t num) {
for (int32_t i = 0; i < num; ++i) {
while (*input != 0 && *input++ != delimiter) {
while (*input != 0 && *input++ != delim) {
};
}
return input;
}
static FORCE_INLINE void copySegment(char* dst, char* src, char delimiter) {
static FORCE_INLINE size_t copy(char* dst, const char* src, char delimiter) {
size_t len = 0;
while (*src != delimiter && *src != 0) {
*dst++ = *src++;
len++;
}
return len;
}
/**
* extract meter name from meterid, which the format of userid.dbname.metername
* extract table name from meterid, which the format of userid.dbname.metername
* @param meterId
* @return
*/
void extractMeterName(char* meterId, char* name) {
void extractTableName(char* meterId, char* name) {
char* r = skipSegments(meterId, TS_PATH_DELIMITER[0], 2);
copySegment(name, r, TS_PATH_DELIMITER[0]);
copy(name, r, TS_PATH_DELIMITER[0]);
}
SSQLToken extractDBName(char* meterId, char* name) {
char* r = skipSegments(meterId, TS_PATH_DELIMITER[0], 1);
copySegment(name, r, TS_PATH_DELIMITER[0]);
size_t len = copy(name, r, TS_PATH_DELIMITER[0]);
SSQLToken token = {.z = name, .n = strlen(name), .type = TK_STRING};
SSQLToken token = {.z = name, .n = len, .type = TK_STRING};
return token;
}

View File

@ -477,25 +477,17 @@ void *tscProcessMsgFromServer(char *msg, void *ahandle, void *thandle) {
if (code == 0) return pSql;
msg = NULL;
} else if (rspCode == TSDB_CODE_NOT_ACTIVE_TABLE || rspCode == TSDB_CODE_INVALID_TABLE_ID ||
rspCode == TSDB_CODE_INVALID_VNODE_ID || rspCode == TSDB_CODE_NOT_ACTIVE_VNODE ||
rspCode == TSDB_CODE_NETWORK_UNAVAIL) {
rspCode == TSDB_CODE_NOT_ACTIVE_VNODE || rspCode == TSDB_CODE_INVALID_VNODE_ID ||
rspCode == TSDB_CODE_TABLE_ID_MISMATCH || rspCode == TSDB_CODE_NETWORK_UNAVAIL) {
#else
if (rspCode == TSDB_CODE_NOT_ACTIVE_TABLE || rspCode == TSDB_CODE_INVALID_TABLE_ID ||
rspCode == TSDB_CODE_INVALID_VNODE_ID || rspCode == TSDB_CODE_NOT_ACTIVE_VNODE ||
rspCode == TSDB_CODE_NETWORK_UNAVAIL) {
rspCode == TSDB_CODE_NOT_ACTIVE_VNODE || rspCode == TSDB_CODE_INVALID_VNODE_ID ||
rspCode == TSDB_CODE_TABLE_ID_MISMATCH || rspCode == TSDB_CODE_NETWORK_UNAVAIL) {
#endif
pSql->thandle = NULL;
taosAddConnIntoCache(tscConnCache, thandle, pSql->ip, pSql->vnode, pObj->user);
if ((pCmd->command == TSDB_SQL_INSERT || pCmd->command == TSDB_SQL_SELECT) &&
(rspCode == TSDB_CODE_INVALID_TABLE_ID || rspCode == TSDB_CODE_INVALID_VNODE_ID)) {
/*
* In case of the insert/select operations, the invalid table(vnode) id means
* the submit/query msg is invalid, renew meter meta will not help to fix this problem,
* so return the invalid_query_msg to client directly.
*/
code = TSDB_CODE_INVALID_QUERY_MSG;
} else if (pCmd->command == TSDB_SQL_CONNECT) {
if (pCmd->command == TSDB_SQL_CONNECT) {
code = TSDB_CODE_NETWORK_UNAVAIL;
} else if (pCmd->command == TSDB_SQL_HB) {
code = TSDB_CODE_NOT_READY;
@ -1482,6 +1474,46 @@ static int32_t tscEstimateQueryMsgSize(SSqlCmd *pCmd) {
return size;
}
static char* doSerializeTableInfo(SSqlObj* pSql, int32_t numOfMeters, int32_t vnodeId, char* pMsg) {
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0);
SMeterMeta * pMeterMeta = pMeterMetaInfo->pMeterMeta;
SMetricMeta *pMetricMeta = pMeterMetaInfo->pMetricMeta;
tscTrace("%p vid:%d, query on %d meters", pSql, htons(vnodeId), numOfMeters);
if (UTIL_METER_IS_NOMRAL_METER(pMeterMetaInfo)) {
#ifdef _DEBUG_VIEW
tscTrace("%p sid:%d, uid:%lld", pSql, pMeterMetaInfo->pMeterMeta->sid, pMeterMetaInfo->pMeterMeta->uid);
#endif
SMeterSidExtInfo *pMeterInfo = (SMeterSidExtInfo *)pMsg;
pMeterInfo->sid = htonl(pMeterMeta->sid);
pMeterInfo->uid = htobe64(pMeterMeta->uid);
pMsg += sizeof(SMeterSidExtInfo);
} else {
SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pMeterMetaInfo->vnodeIndex);
for (int32_t i = 0; i < numOfMeters; ++i) {
SMeterSidExtInfo *pMeterInfo = (SMeterSidExtInfo *)pMsg;
SMeterSidExtInfo *pQueryMeterInfo = tscGetMeterSidInfo(pVnodeSidList, i);
pMeterInfo->sid = htonl(pQueryMeterInfo->sid);
pMeterInfo->uid = htobe64(pQueryMeterInfo->uid);
pMsg += sizeof(SMeterSidExtInfo);
memcpy(pMsg, pQueryMeterInfo->tags, pMetricMeta->tagLen);
pMsg += pMetricMeta->tagLen;
#ifdef _DEBUG_VIEW
tscTrace("%p sid:%d, uid:%lld", pSql, pQueryMeterInfo->sid, pQueryMeterInfo->uid);
#endif
}
}
return pMsg;
}
int tscBuildQueryMsg(SSqlObj *pSql) {
SSqlCmd *pCmd = &pSql->cmd;
@ -1512,7 +1544,7 @@ int tscBuildQueryMsg(SSqlObj *pSql) {
pQueryMsg->vnode = htons(pMeterMeta->vpeerDesc[pMeterMeta->index].vnode);
pQueryMsg->uid = pMeterMeta->uid;
pQueryMsg->numOfTagsCols = 0;
} else { // query on metric
} else { // query on super table
if (pMeterMetaInfo->vnodeIndex < 0) {
tscError("%p error vnodeIdx:%d", pSql, pMeterMetaInfo->vnodeIndex);
return -1;
@ -1699,34 +1731,8 @@ int tscBuildQueryMsg(SSqlObj *pSql) {
pQueryMsg->colNameLen = htonl(len);
// set sids list
tscTrace("%p vid:%d, query on %d meters", pSql, htons(pQueryMsg->vnode), numOfMeters);
if (UTIL_METER_IS_NOMRAL_METER(pMeterMetaInfo)) {
#ifdef _DEBUG_VIEW
tscTrace("%p %d", pSql, pMeterMetaInfo->pMeterMeta->sid);
#endif
SMeterSidExtInfo *pSMeterTagInfo = (SMeterSidExtInfo *)pMsg;
pSMeterTagInfo->sid = htonl(pMeterMeta->sid);
pMsg += sizeof(SMeterSidExtInfo);
} else {
SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pMeterMetaInfo->vnodeIndex);
for (int32_t i = 0; i < numOfMeters; ++i) {
SMeterSidExtInfo *pMeterTagInfo = (SMeterSidExtInfo *)pMsg;
SMeterSidExtInfo *pQueryMeterInfo = tscGetMeterSidInfo(pVnodeSidList, i);
pMeterTagInfo->sid = htonl(pQueryMeterInfo->sid);
pMsg += sizeof(SMeterSidExtInfo);
#ifdef _DEBUG_VIEW
tscTrace("%p %d", pSql, pQueryMeterInfo->sid);
#endif
memcpy(pMsg, pQueryMeterInfo->tags, pMetricMeta->tagLen);
pMsg += pMetricMeta->tagLen;
}
}
// serialize the table info (sid, uid, tags)
pMsg = doSerializeTableInfo(pSql, numOfMeters, htons(pQueryMsg->vnode), pMsg);
// only include the required tag column schema. If a tag is not required, it won't be sent to vnode
if (pMeterMetaInfo->numOfTags > 0) {
@ -2317,7 +2323,7 @@ int tscBuildCreateTableMsg(SSqlObj *pSql) {
size = tscEstimateCreateTableMsgLength(pSql);
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
tscError("%p failed to malloc for create table msg", pSql);
free(tmpData);
free(tmpData);
return -1;
}
@ -3228,44 +3234,47 @@ int tscProcessMetricMetaRsp(SSqlObj *pSql) {
size += pMeta->numOfVnodes * sizeof(SVnodeSidList *) + pMeta->numOfMeters * sizeof(SMeterSidExtInfo *);
char *pStr = calloc(1, size);
if (pStr == NULL) {
char *pBuf = calloc(1, size);
if (pBuf == NULL) {
pSql->res.code = TSDB_CODE_CLI_OUT_OF_MEMORY;
goto _error_clean;
}
SMetricMeta *pNewMetricMeta = (SMetricMeta *)pStr;
SMetricMeta *pNewMetricMeta = (SMetricMeta *)pBuf;
metricMetaList[k] = pNewMetricMeta;
pNewMetricMeta->numOfMeters = pMeta->numOfMeters;
pNewMetricMeta->numOfVnodes = pMeta->numOfVnodes;
pNewMetricMeta->tagLen = pMeta->tagLen;
pStr = pStr + sizeof(SMetricMeta) + pNewMetricMeta->numOfVnodes * sizeof(SVnodeSidList *);
pBuf = pBuf + sizeof(SMetricMeta) + pNewMetricMeta->numOfVnodes * sizeof(SVnodeSidList *);
for (int32_t i = 0; i < pMeta->numOfVnodes; ++i) {
SVnodeSidList *pSidLists = (SVnodeSidList *)rsp;
memcpy(pStr, pSidLists, sizeof(SVnodeSidList));
memcpy(pBuf, pSidLists, sizeof(SVnodeSidList));
pNewMetricMeta->list[i] = pStr - (char *)pNewMetricMeta; // offset value
SVnodeSidList *pLists = (SVnodeSidList *)pStr;
pNewMetricMeta->list[i] = pBuf - (char *)pNewMetricMeta; // offset value
SVnodeSidList *pLists = (SVnodeSidList *)pBuf;
tscTrace("%p metricmeta:vid:%d,numOfMeters:%d", pSql, i, pLists->numOfSids);
pStr += sizeof(SVnodeSidList) + sizeof(SMeterSidExtInfo *) * pSidLists->numOfSids;
pBuf += sizeof(SVnodeSidList) + sizeof(SMeterSidExtInfo *) * pSidLists->numOfSids;
rsp += sizeof(SVnodeSidList);
size_t sidSize = sizeof(SMeterSidExtInfo) + pNewMetricMeta->tagLen;
size_t elemSize = sizeof(SMeterSidExtInfo) + pNewMetricMeta->tagLen;
for (int32_t j = 0; j < pSidLists->numOfSids; ++j) {
pLists->pSidExtInfoList[j] = pStr - (char *)pLists;
memcpy(pStr, rsp, sidSize);
rsp += sidSize;
pStr += sidSize;
pLists->pSidExtInfoList[j] = pBuf - (char *)pLists;
memcpy(pBuf, rsp, elemSize);
((SMeterSidExtInfo*) pBuf)->uid = htobe64(((SMeterSidExtInfo*) pBuf)->uid);
((SMeterSidExtInfo*) pBuf)->sid = htonl(((SMeterSidExtInfo*) pBuf)->sid);
rsp += elemSize;
pBuf += elemSize;
}
}
sizes[k] = pStr - (char *)pNewMetricMeta;
sizes[k] = pBuf - (char *)pNewMetricMeta;
}
for (int32_t i = 0; i < num; ++i) {

View File

@ -136,8 +136,9 @@ extern "C" {
#define TSDB_CODE_INVALID_TABLE_ID 115
#define TSDB_CODE_INVALID_VNODE_STATUS 116
#define TSDB_CODE_FAILED_TO_LOCK_RESOURCES 117
#define TSDB_CODE_TABLE_ID_MISMATCH 118
#define TSDB_CODE_MAX_ERROR_CODE 118
#define TSDB_CODE_MAX_ERROR_CODE 119
#ifdef __cplusplus
}

View File

@ -489,7 +489,7 @@ typedef struct SColumnInfo {
*/
typedef struct SMeterSidExtInfo {
int32_t sid;
void * pObj;
int64_t uid;
char tags[];
} SMeterSidExtInfo;
@ -727,9 +727,7 @@ typedef struct {
int32_t numOfMeters;
int32_t join;
int32_t joinCondLen; // for join condition
int32_t metaElem[TSDB_MAX_JOIN_TABLE_NUM];
} SMetricMetaMsg;
typedef struct {

View File

@ -32,6 +32,8 @@ typedef struct _sched_msg {
void *taosInitScheduler(int queueSize, int numOfThreads, const char *label);
void *taosInitSchedulerWithInfo(int queueSize, int numOfThreads, const char *label, void *tmrCtrl);
int taosScheduleTask(void *qhandle, SSchedMsg *pMsg);
void taosCleanUpScheduler(void *param);

View File

@ -53,7 +53,7 @@ char *tsGetTagsValue(SMeterMeta *pMeta);
bool tsMeterMetaIdentical(SMeterMeta *p1, SMeterMeta *p2);
void extractMeterName(char *meterId, char *name);
void extractTableName(char *meterId, char *name);
SSQLToken extractDBName(char *meterId, char *name);

View File

@ -238,8 +238,9 @@ char *tsError[] = {"success",
"only super table has metric meta info",
"tags value not unique for join",
"invalid submit message",
"not active table(not created yet or dropped already)", //114
"invalid table id",
"invalid vnode status", //116
"not active table(not created yet or dropped already)",
"invalid table id", // 115
"invalid vnode status",
"failed to lock resources",
"table id/uid mismatch", // 118
};

View File

@ -25,7 +25,7 @@ extern "C" {
#include "dnodeSystem.h"
#include "mgmt.h"
#include "tglobalcfg.h"
#include "tstatus.h"
#include "vnodeStatus.h"
#include "ttime.h"
void mgmtCreateDnodeOrderList();

View File

@ -64,15 +64,6 @@ enum _sync_cmd {
TSDB_SYNC_CMD_REMOVE,
};
enum _meter_state {
TSDB_METER_STATE_READY = 0x00,
TSDB_METER_STATE_INSERT = 0x01,
TSDB_METER_STATE_IMPORTING = 0x02,
TSDB_METER_STATE_UPDATING = 0x04,
TSDB_METER_STATE_DELETING = 0x10,
TSDB_METER_STATE_DELETED = 0x18,
};
typedef struct {
int64_t offset : 48;
int64_t length : 16;
@ -267,9 +258,7 @@ typedef struct SQuery {
int16_t checkBufferInLoop; // check if the buffer is full during scan each block
SLimitVal limit;
int32_t rowSize;
int32_t dataRowSize; // row size of each loaded data from disk, the value is
// used for prepare buffer
SSqlGroupbyExpr * pGroupbyExpr;
SSqlFunctionExpr * pSelectExpr;
SColumnInfoEx * colList;

View File

@ -40,6 +40,7 @@ typedef struct SQueryLoadBlockInfo {
int32_t fileId;
int32_t slotIdx;
int32_t sid;
bool tsLoaded; // if timestamp column of current block is loaded or not
} SQueryLoadBlockInfo;
typedef struct SQueryLoadCompBlockInfo {

View File

@ -78,15 +78,26 @@ enum _TSDB_VN_STREAM_STATUS {
TSDB_VN_STREAM_STATUS_START
};
const char* taosGetVgroupStatusStr(int vgroupStatus);
const char* taosGetDbStatusStr(int dbStatus);
const char* taosGetVnodeStatusStr(int vnodeStatus);
const char* taosGetVnodeSyncStatusStr(int vnodeSyncStatus);
const char* taosGetVnodeDropStatusStr(int dropping);
const char* taosGetDnodeStatusStr(int dnodeStatus);
const char* taosGetDnodeLbStatusStr(int dnodeBalanceStatus);
const char* taosGetVgroupLbStatusStr(int vglbStatus);
const char* taosGetVnodeStreamStatusStr(int vnodeStreamStatus);
enum TSDB_TABLE_STATUS {
TSDB_METER_STATE_READY = 0x00,
TSDB_METER_STATE_INSERTING = 0x01,
TSDB_METER_STATE_IMPORTING = 0x02,
TSDB_METER_STATE_UPDATING = 0x04,
TSDB_METER_STATE_DROPPING = 0x10,
TSDB_METER_STATE_DROPPED = 0x18,
};
const char* taosGetVgroupStatusStr(int32_t vgroupStatus);
const char* taosGetDbStatusStr(int32_t dbStatus);
const char* taosGetVnodeStatusStr(int32_t vnodeStatus);
const char* taosGetVnodeSyncStatusStr(int32_t vnodeSyncStatus);
const char* taosGetVnodeDropStatusStr(int32_t dropping);
const char* taosGetDnodeStatusStr(int32_t dnodeStatus);
const char* taosGetDnodeLbStatusStr(int32_t dnodeBalanceStatus);
const char* taosGetVgroupLbStatusStr(int32_t vglbStatus);
const char* taosGetVnodeStreamStatusStr(int32_t vnodeStreamStatus);
const char* taosGetTableStatusStr(int32_t tableStatus);
#ifdef __cplusplus
}

View File

@ -26,7 +26,7 @@
#include "vnodeMgmt.h"
#include "vnodeSystem.h"
#include "vnodeUtil.h"
#include "tstatus.h"
#include "vnodeStatus.h"
SMgmtObj mgmtObj;
extern uint64_t tsCreatedTime;

View File

@ -20,7 +20,7 @@
#include "mgmtBalance.h"
#include "mgmtUtil.h"
#include "tschemautil.h"
#include "tstatus.h"
#include "vnodeStatus.h"
void *dbSdb = NULL;
int tsDbUpdateSize;

View File

@ -20,8 +20,7 @@
#include "dnodeSystem.h"
#include "mgmt.h"
#include "tschemautil.h"
#include "tstatus.h"
#include "tstatus.h"
#include "vnodeStatus.h"
bool mgmtCheckModuleInDnode(SDnodeObj *pDnode, int moduleType);
int mgmtGetDnodesNum();

View File

@ -27,7 +27,7 @@
#include "tsqlfunction.h"
#include "ttime.h"
#include "vnodeTagMgmt.h"
#include "tstatus.h"
#include "vnodeStatus.h"
extern int64_t sdbVersion;
@ -984,6 +984,28 @@ SSchema *mgmtGetMeterSchema(STabObj *pMeter) {
return (SSchema *)pMetric->schema;
}
static int32_t mgmtSerializeTagValue(char* pMsg, STabObj* pMeter, int16_t* tagsId, int32_t numOfTags) {
int32_t offset = 0;
for (int32_t j = 0; j < numOfTags; ++j) {
if (tagsId[j] == TSDB_TBNAME_COLUMN_INDEX) { // handle the table name tags
char name[TSDB_METER_NAME_LEN] = {0};
extractTableName(pMeter->meterId, name);
memcpy(pMsg + offset, name, TSDB_METER_NAME_LEN);
offset += TSDB_METER_NAME_LEN;
} else {
SSchema s = {0};
char * tag = mgmtMeterGetTag(pMeter, tagsId[j], &s);
memcpy(pMsg + offset, tag, (size_t)s.bytes);
offset += s.bytes;
}
}
return offset;
}
/*
* serialize SVnodeSidList to byte array
*/
@ -996,7 +1018,6 @@ static char *mgmtBuildMetricMetaMsg(SConnObj *pConn, STabObj *pMeter, int32_t *o
* 1. the query msg may be larger than 64k,
* 2. the following meters belong to different vnodes
*/
(*pList) = (SVnodeSidList *)pMsg;
(*pList)->numOfSids = 0;
(*pList)->index = 0;
@ -1020,29 +1041,15 @@ static char *mgmtBuildMetricMetaMsg(SConnObj *pConn, STabObj *pMeter, int32_t *o
(*pList)->numOfSids++;
SMeterSidExtInfo *pSMeterTagInfo = (SMeterSidExtInfo *)pMsg;
pSMeterTagInfo->sid = pMeter->gid.sid;
pSMeterTagInfo->sid = htonl(pMeter->gid.sid);
pSMeterTagInfo->uid = htobe64(pMeter->uid);
pMsg += sizeof(SMeterSidExtInfo);
int32_t offset = 0;
for (int32_t j = 0; j < numOfTags; ++j) {
if (tagsId[j] == -1) {
char name[TSDB_METER_NAME_LEN] = {0};
extractMeterName(pMeter->meterId, name);
memcpy(pMsg + offset, name, TSDB_METER_NAME_LEN);
offset += TSDB_METER_NAME_LEN;
} else {
SSchema s = {0};
char * tag = mgmtMeterGetTag(pMeter, tagsId[j], &s);
memcpy(pMsg + offset, tag, (size_t)s.bytes);
offset += s.bytes;
}
}
pMsg += offset;
int32_t offset = mgmtSerializeTagValue(pMsg, pMeter, tagsId, numOfTags);
assert(offset == tagLen);
pMsg += offset;
return pMsg;
}
@ -1214,12 +1221,9 @@ int mgmtRetrieveMetricMeta(SConnObj *pConn, char **pStart, SMetricMetaMsg *pMetr
#endif
if (ret == TSDB_CODE_SUCCESS) {
// todo opt performance
for (int32_t i = 0; i < pMetricMetaMsg->numOfMeters; ++i) {
ret = mgmtRetrieveMetersFromMetric(pMetricMetaMsg, i, &result[i]);
// todo opt performance
// if (result[i].num <= 0) {//no result
// } else if (result[i].num < 10) {
// }
}
}
@ -1287,7 +1291,7 @@ int mgmtRetrieveMeters(SShowObj *pShow, char *data, int rows, SConnObj *pConn) {
memset(meterName, 0, tListLen(meterName));
// pattern compare for meter name
extractMeterName(pMeter->meterId, meterName);
extractTableName(pMeter->meterId, meterName);
if (pShow->payloadLen > 0 &&
patternMatch(pShow->payload, meterName, TSDB_METER_NAME_LEN, &info) != TSDB_PATTERN_MATCH)
@ -1309,7 +1313,7 @@ int mgmtRetrieveMeters(SShowObj *pShow, char *data, int rows, SConnObj *pConn) {
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
if (pMeter->pTagData) {
extractMeterName(pMeter->pTagData, pWrite);
extractTableName(pMeter->pTagData, pWrite);
}
cols++;
@ -1393,7 +1397,7 @@ int mgmtRetrieveMetrics(SShowObj *pShow, char *data, int rows, SConnObj *pConn)
pShow->pNode = (void *)pMetric->next;
memset(metricName, 0, tListLen(metricName));
extractMeterName(pMetric->meterId, metricName);
extractTableName(pMetric->meterId, metricName);
if (pShow->payloadLen > 0 &&
patternMatch(pShow->payload, metricName, TSDB_METER_NAME_LEN, &info) != TSDB_PATTERN_MATCH)
@ -1402,7 +1406,7 @@ int mgmtRetrieveMetrics(SShowObj *pShow, char *data, int rows, SConnObj *pConn)
cols = 0;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
extractMeterName(pMetric->meterId, pWrite);
extractTableName(pMetric->meterId, pWrite);
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;

View File

@ -21,7 +21,7 @@
#include "mgmtProfile.h"
#include "taosmsg.h"
#include "tlog.h"
#include "tstatus.h"
#include "vnodeStatus.h"
#define MAX_LEN_OF_METER_META (sizeof(SMultiMeterMeta) + sizeof(SSchema) * TSDB_MAX_COLUMNS + sizeof(SSchema) * TSDB_MAX_TAGS + TSDB_MAX_TAGS_LEN)

View File

@ -196,7 +196,7 @@ static bool mgmtTablenameFilterCallback(tSkipListNode* pNode, void* param) {
// pattern compare for meter name
STabObj* pMeterObj = (STabObj*)pNode->pData;
extractMeterName(pMeterObj->meterId, name);
extractTableName(pMeterObj->meterId, name);
return patternMatch(pSupporter->pattern, name, TSDB_METER_ID_LEN, &pSupporter->info) == TSDB_PATTERN_MATCH;
}
@ -786,7 +786,7 @@ int mgmtRetrieveMetersFromMetric(SMetricMetaMsg* pMsg, int32_t tableIndex, tQuer
// todo refactor!!!!!
static char* getTagValueFromMeter(STabObj* pMeter, int32_t offset, int32_t len, char* param) {
if (offset == TSDB_TBNAME_COLUMN_INDEX) {
extractMeterName(pMeter->meterId, param);
extractTableName(pMeter->meterId, param);
} else {
char* tags = pMeter->pTagData + offset + TSDB_METER_ID_LEN; // tag start position
memcpy(param, tags, len); // make sure the value is null-terminated string

View File

@ -19,7 +19,7 @@
#include "mgmt.h"
#include "tschemautil.h"
#include "tlog.h"
#include "tstatus.h"
#include "vnodeStatus.h"
void * vgSdb = NULL;
int tsVgUpdateSize;

View File

@ -20,7 +20,7 @@
#include "vnode.h"
#include "vnodeCache.h"
#include "vnodeUtil.h"
#include "tstatus.h"
#include "vnodeStatus.h"
void vnodeSearchPointInCache(SMeterObj *pObj, SQuery *pQuery);
void vnodeProcessCommitTimer(void *param, void *tmrId);

View File

@ -19,6 +19,7 @@
#include "tsdb.h"
#include "vnode.h"
#include "vnodeUtil.h"
#include "vnodeStatus.h"
typedef struct {
int sversion;
@ -165,7 +166,7 @@ size_t vnodeRestoreDataFromLog(int vnode, char *fileName, uint64_t *firstV) {
continue;
}
if (vnodeIsMeterState(pObj, TSDB_METER_STATE_DELETING)) {
if (vnodeIsMeterState(pObj, TSDB_METER_STATE_DROPPING)) {
dWarn("vid:%d sid:%d id:%s, meter is dropped, ignore data in commit log, contLen:%d action:%d",
vnode, head.sid, head.contLen, head.action);
continue;

View File

@ -21,6 +21,7 @@
#include "vnode.h"
#include "vnodeFile.h"
#include "vnodeUtil.h"
#include "vnodeStatus.h"
#define FILE_QUERY_NEW_BLOCK -5 // a special negative number
@ -611,7 +612,7 @@ _again:
}
// meter is going to be deleted, abort
if (vnodeIsMeterState(pObj, TSDB_METER_STATE_DELETING)) {
if (vnodeIsMeterState(pObj, TSDB_METER_STATE_DROPPING)) {
dWarn("vid:%d sid:%d is dropped, ignore this meter", vnode, sid);
continue;
}

View File

@ -18,6 +18,7 @@
#include "vnode.h"
#include "vnodeUtil.h"
#include "vnodeStatus.h"
extern void vnodeGetHeadTname(char *nHeadName, char *nLastName, int vnode, int fileId);
extern int vnodeReadColumnToMem(int fd, SCompBlock *pBlock, SField **fields, int col, char *data, int dataSize,

View File

@ -24,7 +24,7 @@
#include "vnodeMgmt.h"
#include "vnodeShell.h"
#include "vnodeUtil.h"
#include "tstatus.h"
#include "vnodeStatus.h"
#define VALID_TIMESTAMP(key, curKey, prec) (((key) >= 0) && ((key) <= ((curKey) + 36500 * tsMsPerDay[prec])))
@ -520,7 +520,7 @@ int vnodeRemoveMeterObj(int vnode, int sid) {
}
// after remove this meter, change its state to DELETED
pObj->state = TSDB_METER_STATE_DELETED;
pObj->state = TSDB_METER_STATE_DROPPED;
pObj->timeStamp = taosGetTimestampMs();
vnodeList[vnode].lastRemove = pObj->timeStamp;
@ -612,12 +612,12 @@ int vnodeInsertPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi
return TSDB_CODE_TIMESTAMP_OUT_OF_RANGE;
}
if ((code = vnodeSetMeterInsertImportStateEx(pObj, TSDB_METER_STATE_INSERT)) != TSDB_CODE_SUCCESS) {
if ((code = vnodeSetMeterInsertImportStateEx(pObj, TSDB_METER_STATE_INSERTING)) != TSDB_CODE_SUCCESS) {
goto _over;
}
for (i = 0; i < numOfPoints; ++i) { // meter will be dropped, abort current insertion
if (vnodeIsMeterState(pObj, TSDB_METER_STATE_DELETING)) {
if (vnodeIsMeterState(pObj, TSDB_METER_STATE_DROPPING)) {
dWarn("vid:%d sid:%d id:%s, meter is dropped, abort insert, state:%d", pObj->vnode, pObj->sid, pObj->meterId,
pObj->state);
@ -660,7 +660,7 @@ int vnodeInsertPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi
pthread_mutex_unlock(&(pVnode->vmutex));
vnodeClearMeterState(pObj, TSDB_METER_STATE_INSERT);
vnodeClearMeterState(pObj, TSDB_METER_STATE_INSERTING);
_over:
dTrace("vid:%d sid:%d id:%s, %d out of %d points are inserted, lastKey:%ld source:%d, vnode total storage: %ld",
@ -726,7 +726,7 @@ void vnodeUpdateMeter(void *param, void *tmrId) {
}
SMeterObj *pObj = pVnode->meterList[pNew->sid];
if (pObj == NULL || vnodeIsMeterState(pObj, TSDB_METER_STATE_DELETING)) {
if (pObj == NULL || vnodeIsMeterState(pObj, TSDB_METER_STATE_DROPPING)) {
dTrace("vid:%d sid:%d id:%s, meter is deleted, abort update schema", pNew->vnode, pNew->sid, pNew->meterId);
free(pNew->schema);
free(pNew);
@ -734,7 +734,7 @@ void vnodeUpdateMeter(void *param, void *tmrId) {
}
int32_t state = vnodeSetMeterState(pObj, TSDB_METER_STATE_UPDATING);
if (state >= TSDB_METER_STATE_DELETING) {
if (state >= TSDB_METER_STATE_DROPPING) {
dError("vid:%d sid:%d id:%s, meter is deleted, failed to update, state:%d",
pObj->vnode, pObj->sid, pObj->meterId, state);
return;

View File

@ -31,6 +31,7 @@
#include "vnodeDataFilterFunc.h"
#include "vnodeFile.h"
#include "vnodeQueryImpl.h"
#include "vnodeStatus.h"
enum {
TS_JOIN_TS_EQUAL = 0,
@ -38,10 +39,16 @@ enum {
TS_JOIN_TAG_NOT_EQUALS = 2,
};
enum {
DISK_BLOCK_NO_NEED_TO_LOAD = 0,
DISK_BLOCK_LOAD_TS = 1,
DISK_BLOCK_LOAD_BLOCK = 2,
};
#define IS_DISK_DATA_BLOCK(q) ((q)->fileId >= 0)
//static int32_t copyDataFromMMapBuffer(int fd, SQInfo *pQInfo, SQueryFilesInfo *pQueryFile, char *buf, uint64_t offset,
// int32_t size);
// static int32_t copyDataFromMMapBuffer(int fd, SQInfo *pQInfo, SQueryFilesInfo *pQueryFile, char *buf, uint64_t
// offset, int32_t size);
static int32_t readDataFromDiskFile(int fd, SQInfo *pQInfo, SQueryFilesInfo *pQueryFile, char *buf, uint64_t offset,
int32_t size);
@ -68,17 +75,17 @@ static void doApplyIntervalQueryOnBlock(SMeterQuerySupportObj *pSupporter, SMete
__block_search_fn_t searchFn);
static int32_t saveResult(SMeterQuerySupportObj *pSupporter, SMeterQueryInfo *pMeterQueryInfo, int32_t numOfResult);
static void applyIntervalQueryOnBlock(SMeterQuerySupportObj *pSupporter, SMeterDataInfo *pInfoEx, char *data,
int64_t *pPrimaryData, SBlockInfo *pBlockInfo, int32_t blockStatus,
SField *pFields, __block_search_fn_t searchFn);
static void applyIntervalQueryOnBlock(SMeterQuerySupportObj *pSupporter, SMeterDataInfo *pInfoEx, char *data,
int64_t *pPrimaryData, SBlockInfo *pBlockInfo, int32_t blockStatus,
SField *pFields, __block_search_fn_t searchFn);
static void resetMergeResultBuf(SQuery *pQuery, SQLFunctionCtx *pCtx);
static void resetMergeResultBuf(SQuery *pQuery, SQLFunctionCtx *pCtx);
static int32_t flushFromResultBuf(SMeterQuerySupportObj *pSupporter, const SQuery *pQuery,
const SQueryRuntimeEnv *pRuntimeEnv);
static void validateTimestampForSupplementResult(SQueryRuntimeEnv *pRuntimeEnv, int64_t numOfIncrementRes);
static void getBasicCacheInfoSnapshot(SQuery *pQuery, SCacheInfo *pCacheInfo, int32_t vid);
static void getQueryPositionForCacheInvalid(SQueryRuntimeEnv *pRuntimeEnv, __block_search_fn_t searchFn);
static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx, int32_t functionId);
const SQueryRuntimeEnv *pRuntimeEnv);
static void validateTimestampForSupplementResult(SQueryRuntimeEnv *pRuntimeEnv, int64_t numOfIncrementRes);
static void getBasicCacheInfoSnapshot(SQuery *pQuery, SCacheInfo *pCacheInfo, int32_t vid);
static void getQueryPositionForCacheInvalid(SQueryRuntimeEnv *pRuntimeEnv, __block_search_fn_t searchFn);
static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx, int32_t functionId);
// check the offset value integrity
static FORCE_INLINE int32_t validateHeaderOffsetSegment(SQInfo *pQInfo, char *filePath, int32_t vid, char *data,
@ -121,8 +128,8 @@ static FORCE_INLINE int32_t validateCompBlockInfoSegment(SQInfo *pQInfo, const c
return 0;
}
static FORCE_INLINE int32_t validateCompBlockSegment(SQInfo *pQInfo, const char *filePath, SCompInfo *compInfo, char *pBlock,
int32_t vid, TSCKSUM checksum) {
static FORCE_INLINE int32_t validateCompBlockSegment(SQInfo *pQInfo, const char *filePath, SCompInfo *compInfo,
char *pBlock, int32_t vid, TSCKSUM checksum) {
uint32_t size = compInfo->numOfBlocks * sizeof(SCompBlock);
if (checksum != taosCalcChecksum(0, (uint8_t *)pBlock, size)) {
@ -195,7 +202,8 @@ static bool vnodeIsCompBlockInfoLoaded(SQueryRuntimeEnv *pRuntimeEnv, SMeterObj
// if vnodeFreeFields is called, the pQuery->pFields is NULL
if (pLoadCompBlockInfo->fileListIndex == fileIndex && pLoadCompBlockInfo->sid == pMeterObj->sid &&
pQuery->pFields != NULL && pQuery->fileId > 0) {
assert(pRuntimeEnv->vnodeFileInfo.pFileInfo[fileIndex].fileID == pLoadCompBlockInfo->fileId && pQuery->numOfBlocks > 0);
assert(pRuntimeEnv->vnodeFileInfo.pFileInfo[fileIndex].fileID == pLoadCompBlockInfo->fileId &&
pQuery->numOfBlocks > 0);
return true;
}
@ -216,7 +224,8 @@ static void vnodeInitLoadCompBlockInfo(SQueryLoadCompBlockInfo *pCompBlockLoadIn
pCompBlockLoadInfo->fileListIndex = -1;
}
static bool vnodeIsDatablockLoaded(SQueryRuntimeEnv *pRuntimeEnv, SMeterObj *pMeterObj, int32_t fileIndex) {
static int32_t vnodeIsDatablockLoaded(SQueryRuntimeEnv *pRuntimeEnv, SMeterObj *pMeterObj, int32_t fileIndex,
bool loadPrimaryTS) {
SQuery * pQuery = pRuntimeEnv->pQuery;
SQueryLoadBlockInfo *pLoadInfo = &pRuntimeEnv->loadBlockInfo;
@ -224,13 +233,20 @@ static bool vnodeIsDatablockLoaded(SQueryRuntimeEnv *pRuntimeEnv, SMeterObj *pMe
if (pLoadInfo->fileId == pQuery->fileId && pLoadInfo->slotIdx == pQuery->slot && pQuery->slot != -1 &&
pLoadInfo->sid == pMeterObj->sid) {
assert(fileIndex == pLoadInfo->fileListIndex);
return true;
// previous load operation does not load the primary timestamp column, we only need to load the timestamp column
if (pLoadInfo->tsLoaded == false && pLoadInfo->tsLoaded != loadPrimaryTS) {
return DISK_BLOCK_LOAD_TS;
} else {
return DISK_BLOCK_NO_NEED_TO_LOAD;
}
}
return false;
return DISK_BLOCK_LOAD_BLOCK;
}
static void vnodeSetDataBlockInfoLoaded(SQueryRuntimeEnv *pRuntimeEnv, SMeterObj *pMeterObj, int32_t fileIndex) {
static void vnodeSetDataBlockInfoLoaded(SQueryRuntimeEnv *pRuntimeEnv, SMeterObj *pMeterObj, int32_t fileIndex,
bool tsLoaded) {
SQuery * pQuery = pRuntimeEnv->pQuery;
SQueryLoadBlockInfo *pLoadInfo = &pRuntimeEnv->loadBlockInfo;
@ -238,6 +254,7 @@ static void vnodeSetDataBlockInfoLoaded(SQueryRuntimeEnv *pRuntimeEnv, SMeterObj
pLoadInfo->slotIdx = pQuery->slot;
pLoadInfo->fileListIndex = fileIndex;
pLoadInfo->sid = pMeterObj->sid;
pLoadInfo->tsLoaded = tsLoaded;
}
static void vnodeInitDataBlockInfo(SQueryLoadBlockInfo *pBlockLoadInfo) {
@ -247,35 +264,35 @@ static void vnodeInitDataBlockInfo(SQueryLoadBlockInfo *pBlockLoadInfo) {
pBlockLoadInfo->fileListIndex = -1;
}
static void vnodeSetOpenedFileNames(SQueryFilesInfo* pVnodeFilesInfo) {
static void vnodeSetOpenedFileNames(SQueryFilesInfo *pVnodeFilesInfo) {
assert(pVnodeFilesInfo->current >= 0 && pVnodeFilesInfo->current < pVnodeFilesInfo->numOfFiles);
SHeaderFileInfo* pCurrentFileInfo = &pVnodeFilesInfo->pFileInfo[pVnodeFilesInfo->current];
SHeaderFileInfo *pCurrentFileInfo = &pVnodeFilesInfo->pFileInfo[pVnodeFilesInfo->current];
/*
* set the full file path for current opened files
* the maximum allowed path string length is PATH_MAX in Linux, 100 bytes is used to
* suppress the compiler warnings
*/
char str[PATH_MAX + 100] = {0};
char str[PATH_MAX + 100] = {0};
int32_t PATH_WITH_EXTRA = PATH_MAX + 100;
int32_t vnodeId = pVnodeFilesInfo->vnodeId;
int32_t fileId = pCurrentFileInfo->fileID;
int32_t len = snprintf(str, PATH_WITH_EXTRA, "%sv%df%d.head", pVnodeFilesInfo->dbFilePathPrefix, vnodeId, fileId);
assert(len <= PATH_MAX);
strncpy(pVnodeFilesInfo->headerFilePath, str, PATH_MAX);
len = snprintf(str, PATH_WITH_EXTRA, "%sv%df%d.data", pVnodeFilesInfo->dbFilePathPrefix, vnodeId, fileId);
assert(len <= PATH_MAX);
strncpy(pVnodeFilesInfo->dataFilePath, str, PATH_MAX);
len = snprintf(str, PATH_WITH_EXTRA, "%sv%df%d.last", pVnodeFilesInfo->dbFilePathPrefix, vnodeId, fileId);
assert(len <= PATH_MAX);
strncpy(pVnodeFilesInfo->lastFilePath, str, PATH_MAX);
}
@ -287,31 +304,31 @@ static void vnodeSetOpenedFileNames(SQueryFilesInfo* pVnodeFilesInfo) {
* @return
*/
static FORCE_INLINE bool isHeaderFileEmpty(int32_t vnodeId, size_t headerFileSize) {
SVnodeCfg* pVnodeCfg = &vnodeList[vnodeId].cfg;
SVnodeCfg *pVnodeCfg = &vnodeList[vnodeId].cfg;
return headerFileSize <= getCompHeaderStartPosition(pVnodeCfg);
}
static bool checkIsHeaderFileEmpty(SQueryFilesInfo* pVnodeFilesInfo, int32_t vnodeId) {
static bool checkIsHeaderFileEmpty(SQueryFilesInfo *pVnodeFilesInfo, int32_t vnodeId) {
struct stat fstat = {0};
if (stat(pVnodeFilesInfo->headerFilePath, &fstat) < 0) {
return true;
}
pVnodeFilesInfo->headFileSize = fstat.st_size;
return isHeaderFileEmpty(vnodeId, pVnodeFilesInfo->headFileSize);
}
static void doCloseQueryFileInfoFD(SQueryFilesInfo* pVnodeFilesInfo) {
static void doCloseQueryFileInfoFD(SQueryFilesInfo *pVnodeFilesInfo) {
tclose(pVnodeFilesInfo->headerFd);
tclose(pVnodeFilesInfo->dataFd);
tclose(pVnodeFilesInfo->lastFd);
}
static void doInitQueryFileInfoFD(SQueryFilesInfo* pVnodeFilesInfo) {
static void doInitQueryFileInfoFD(SQueryFilesInfo *pVnodeFilesInfo) {
pVnodeFilesInfo->current = -1;
pVnodeFilesInfo->headFileSize = -1;
pVnodeFilesInfo->headerFd = FD_INITIALIZER; // set the initial value
pVnodeFilesInfo->dataFd = FD_INITIALIZER;
pVnodeFilesInfo->lastFd = FD_INITIALIZER;
@ -320,15 +337,15 @@ static void doInitQueryFileInfoFD(SQueryFilesInfo* pVnodeFilesInfo) {
/*
* clean memory and other corresponding resources are delegated to invoker
*/
static int32_t doOpenQueryFileData(SQInfo* pQInfo, SQueryFilesInfo* pVnodeFileInfo, int32_t vnodeId) {
SHeaderFileInfo* pHeaderFileInfo = &pVnodeFileInfo->pFileInfo[pVnodeFileInfo->current];
static int32_t doOpenQueryFileData(SQInfo *pQInfo, SQueryFilesInfo *pVnodeFileInfo, int32_t vnodeId) {
SHeaderFileInfo *pHeaderFileInfo = &pVnodeFileInfo->pFileInfo[pVnodeFileInfo->current];
pVnodeFileInfo->headerFd = open(pVnodeFileInfo->headerFilePath, O_RDONLY);
if (!FD_VALID(pVnodeFileInfo->headerFd)) {
dError("QInfo:%p failed open head file:%s reason:%s", pQInfo, pVnodeFileInfo->headerFilePath, strerror(errno));
return -1;
}
/*
* current header file is empty or broken, return directly.
*
@ -339,55 +356,54 @@ static int32_t doOpenQueryFileData(SQInfo* pQInfo, SQueryFilesInfo* pVnodeFileIn
if (checkIsHeaderFileEmpty(pVnodeFileInfo, vnodeId)) {
qTrace("QInfo:%p vid:%d, fileId:%d, index:%d, size:%d, ignore file, empty or broken", pQInfo,
pVnodeFileInfo->vnodeId, pHeaderFileInfo->fileID, pVnodeFileInfo->current, pVnodeFileInfo->headFileSize);
return -1;
}
pVnodeFileInfo->dataFd = open(pVnodeFileInfo->dataFilePath, O_RDONLY);
if (!FD_VALID(pVnodeFileInfo->dataFd)) {
dError("QInfo:%p failed open data file:%s reason:%s", pQInfo, pVnodeFileInfo->dataFilePath, strerror(errno));
return -1;
}
pVnodeFileInfo->lastFd = open(pVnodeFileInfo->lastFilePath, O_RDONLY);
if (!FD_VALID(pVnodeFileInfo->lastFd)) {
dError("QInfo:%p failed open last file:%s reason:%s", pQInfo, pVnodeFileInfo->lastFilePath, strerror(errno));
return -1;
}
pVnodeFileInfo->pHeaderFileData = mmap(NULL, pVnodeFileInfo->headFileSize, PROT_READ, MAP_SHARED,
pVnodeFileInfo->headerFd, 0);
pVnodeFileInfo->pHeaderFileData =
mmap(NULL, pVnodeFileInfo->headFileSize, PROT_READ, MAP_SHARED, pVnodeFileInfo->headerFd, 0);
if (pVnodeFileInfo->pHeaderFileData == MAP_FAILED) {
pVnodeFileInfo->pHeaderFileData = NULL;
doCloseQueryFileInfoFD(pVnodeFileInfo);
doInitQueryFileInfoFD(pVnodeFileInfo);
dError("QInfo:%p failed to mmap header file:%s, size:%lld, %s", pQInfo, pVnodeFileInfo->headerFilePath,
pVnodeFileInfo->headFileSize, strerror(errno));
return -1;
} else {
if (madvise(pVnodeFileInfo->pHeaderFileData, pVnodeFileInfo->headFileSize, MADV_SEQUENTIAL) == -1) {
dError("QInfo:%p failed to advise kernel the usage of header file, reason:%s", pQInfo, strerror(errno));
}
}
return TSDB_CODE_SUCCESS;
}
static void doUnmapHeaderFile(SQueryFilesInfo* pVnodeFileInfo) {
static void doUnmapHeaderFile(SQueryFilesInfo *pVnodeFileInfo) {
munmap(pVnodeFileInfo->pHeaderFileData, pVnodeFileInfo->headFileSize);
pVnodeFileInfo->pHeaderFileData = NULL;
pVnodeFileInfo->headFileSize = -1;
}
static void doCloseOpenedFileData(SQueryFilesInfo* pVnodeFileInfo) {
static void doCloseOpenedFileData(SQueryFilesInfo *pVnodeFileInfo) {
if (pVnodeFileInfo->current >= 0) {
assert(pVnodeFileInfo->current < pVnodeFileInfo->numOfFiles && pVnodeFileInfo->current >= 0);
doUnmapHeaderFile(pVnodeFileInfo);
doCloseQueryFileInfoFD(pVnodeFileInfo);
doInitQueryFileInfoFD(pVnodeFileInfo);
@ -412,22 +428,22 @@ char *vnodeGetHeaderFileData(SQueryRuntimeEnv *pRuntimeEnv, int32_t vnodeId, int
SQInfo *pQInfo = (SQInfo *)GET_QINFO_ADDR(pQuery); // only for log output
SQueryFilesInfo *pVnodeFileInfo = &pRuntimeEnv->vnodeFileInfo;
if (pVnodeFileInfo->current != fileIndex || pVnodeFileInfo->pHeaderFileData == NULL) {
if (pVnodeFileInfo->current >= 0) {
assert(pVnodeFileInfo->pHeaderFileData != NULL);
}
// do close the current memory mapped header file and corresponding fd
doCloseOpenedFileData(pVnodeFileInfo);
assert(pVnodeFileInfo->pHeaderFileData == NULL);
// set current opened file Index
pVnodeFileInfo->current = fileIndex;
// set the current opened files(header, data, last) path
vnodeSetOpenedFileNames(pVnodeFileInfo);
if (doOpenQueryFileData(pQInfo, pVnodeFileInfo, vnodeId) != TSDB_CODE_SUCCESS) {
doCloseOpenedFileData(pVnodeFileInfo); // all the fds may be partially opened, close them anyway.
return pVnodeFileInfo->pHeaderFileData;
@ -445,7 +461,7 @@ static int vnodeGetCompBlockInfo(SMeterObj *pMeterObj, SQueryRuntimeEnv *pRuntim
SQuery *pQuery = pRuntimeEnv->pQuery;
SQInfo *pQInfo = (SQInfo *)GET_QINFO_ADDR(pQuery);
SVnodeCfg * pCfg = &vnodeList[pMeterObj->vnode].cfg;
SVnodeCfg * pCfg = &vnodeList[pMeterObj->vnode].cfg;
SHeaderFileInfo *pHeadeFileInfo = &pRuntimeEnv->vnodeFileInfo.pFileInfo[fileIndex];
int64_t st = taosGetTimestampUs();
@ -466,7 +482,7 @@ static int vnodeGetCompBlockInfo(SMeterObj *pMeterObj, SQueryRuntimeEnv *pRuntim
if (data == NULL) {
return -1; // failed to load the header file data into memory
}
#else
char *data = calloc(1, tmsize + TSDB_FILE_HEADER_LEN);
read(fd, data, tmsize + TSDB_FILE_HEADER_LEN);
@ -487,7 +503,8 @@ static int vnodeGetCompBlockInfo(SMeterObj *pMeterObj, SQueryRuntimeEnv *pRuntim
}
// corrupted file may cause the invalid compInfoOffset, check needs
if (validateCompBlockOffset(pQInfo, pMeterObj, compHeader, &pRuntimeEnv->vnodeFileInfo, getCompHeaderStartPosition(pCfg)) < 0) {
if (validateCompBlockOffset(pQInfo, pMeterObj, compHeader, &pRuntimeEnv->vnodeFileInfo,
getCompHeaderStartPosition(pCfg)) < 0) {
return -1;
}
@ -751,8 +768,7 @@ static int32_t loadColumnIntoMem(SQuery *pQuery, SQueryFilesInfo *pQueryFileInfo
// load checksum
TSCKSUM checksum = 0;
ret = readDataFromDiskFile(fd, pQInfo, pQueryFileInfo, (char *)&checksum, offset + pFields[col].len,
sizeof(TSCKSUM));
ret = readDataFromDiskFile(fd, pQInfo, pQueryFileInfo, (char *)&checksum, offset + pFields[col].len, sizeof(TSCKSUM));
if (ret != 0) {
return ret;
}
@ -774,11 +790,11 @@ static int32_t loadColumnIntoMem(SQuery *pQuery, SQueryFilesInfo *pQueryFileInfo
}
static int32_t loadDataBlockFieldsInfo(SQueryRuntimeEnv *pRuntimeEnv, SCompBlock *pBlock, SField **pField) {
SQuery * pQuery = pRuntimeEnv->pQuery;
SQInfo * pQInfo = (SQInfo *)GET_QINFO_ADDR(pQuery);
SMeterObj *pMeterObj = pRuntimeEnv->pMeterObj;
SQuery * pQuery = pRuntimeEnv->pQuery;
SQInfo * pQInfo = (SQInfo *)GET_QINFO_ADDR(pQuery);
SMeterObj * pMeterObj = pRuntimeEnv->pMeterObj;
SQueryFilesInfo *pVnodeFilesInfo = &pRuntimeEnv->vnodeFileInfo;
size_t size = sizeof(SField) * (pBlock->numOfCols) + sizeof(TSCKSUM);
// if *pField != NULL, this block is loaded once, in current query do nothing
@ -822,6 +838,21 @@ static void fillWithNull(SQuery *pQuery, char *dst, int32_t col, int32_t numOfPo
setNullN(dst, type, bytes, numOfPoints);
}
static int32_t loadPrimaryTSColumn(SQueryRuntimeEnv *pRuntimeEnv, SCompBlock *pBlock, SField **pField,
int32_t *columnBytes) {
SQuery *pQuery = pRuntimeEnv->pQuery;
assert(PRIMARY_TSCOL_LOADED(pQuery) == false);
if (columnBytes != NULL) {
(*columnBytes) += (*pField)[PRIMARYKEY_TIMESTAMP_COL_INDEX].len + sizeof(TSCKSUM);
}
int32_t ret = loadColumnIntoMem(pQuery, &pRuntimeEnv->vnodeFileInfo, pBlock, *pField, PRIMARYKEY_TIMESTAMP_COL_INDEX,
pRuntimeEnv->primaryColBuffer, pRuntimeEnv->unzipBuffer,
pRuntimeEnv->secondaryUnzipBuffer, pRuntimeEnv->unzipBufSize);
return ret;
}
static int32_t loadDataBlockIntoMem(SCompBlock *pBlock, SField **pField, SQueryRuntimeEnv *pRuntimeEnv, int32_t fileIdx,
bool loadPrimaryCol, bool loadSField) {
int32_t i = 0, j = 0;
@ -831,16 +862,40 @@ static int32_t loadDataBlockIntoMem(SCompBlock *pBlock, SField **pField, SQueryR
SData ** sdata = pRuntimeEnv->colDataBuffer;
assert(fileIdx == pRuntimeEnv->vnodeFileInfo.current);
SData **primaryTSBuf = &pRuntimeEnv->primaryColBuffer;
void * tmpBuf = pRuntimeEnv->unzipBuffer;
int32_t columnBytes = 0;
SQueryCostSummary *pSummary = &pRuntimeEnv->summary;
int32_t status = vnodeIsDatablockLoaded(pRuntimeEnv, pMeterObj, fileIdx, loadPrimaryCol);
if (status == DISK_BLOCK_NO_NEED_TO_LOAD) {
dTrace(
"QInfo:%p vid:%d sid:%d id:%s, fileId:%d, data block has been loaded, no need to load again, ts:%d, slot:%d, "
"brange:%lld-%lld, rows:%d",
GET_QINFO_ADDR(pQuery), pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->fileId, loadPrimaryCol,
pQuery->slot, pBlock->keyFirst, pBlock->keyLast, pBlock->numOfPoints);
SData ** primaryTSBuf = &pRuntimeEnv->primaryColBuffer;
void * tmpBuf = pRuntimeEnv->unzipBuffer;
if (loadSField && (pQuery->pFields == NULL || pQuery->pFields[pQuery->slot] == NULL)) {
loadDataBlockFieldsInfo(pRuntimeEnv, pBlock, &pQuery->pFields[pQuery->slot]);
}
return TSDB_CODE_SUCCESS;
} else if (status == DISK_BLOCK_LOAD_TS) {
dTrace("QInfo:%p vid:%d sid:%d id:%s, fileId:%d, data block has been loaded, incrementally load ts",
GET_QINFO_ADDR(pQuery), pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->fileId);
assert(PRIMARY_TSCOL_LOADED(pQuery) == false && loadSField == true);
if (pQuery->pFields == NULL || pQuery->pFields[pQuery->slot] == NULL) {
loadDataBlockFieldsInfo(pRuntimeEnv, pBlock, &pQuery->pFields[pQuery->slot]);
}
if (vnodeIsDatablockLoaded(pRuntimeEnv, pMeterObj, fileIdx)) {
dTrace("QInfo:%p vid:%d sid:%d id:%s, data block has been loaded, ts:%d, slot:%d, brange:%lld-%lld, rows:%d",
GET_QINFO_ADDR(pQuery), pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, loadPrimaryCol, pQuery->slot,
pBlock->keyFirst, pBlock->keyLast, pBlock->numOfPoints);
return 0;
// load primary timestamp
int32_t ret = loadPrimaryTSColumn(pRuntimeEnv, pBlock, pField, &columnBytes);
vnodeSetDataBlockInfoLoaded(pRuntimeEnv, pMeterObj, fileIdx, loadPrimaryCol);
return ret;
}
/* failed to load fields info, return with error info */
@ -848,21 +903,15 @@ static int32_t loadDataBlockIntoMem(SCompBlock *pBlock, SField **pField, SQueryR
return -1;
}
SQueryCostSummary *pSummary = &pRuntimeEnv->summary;
int32_t columnBytes = 0;
int64_t st = taosGetTimestampUs();
if (loadPrimaryCol) {
if (PRIMARY_TSCOL_LOADED(pQuery)) {
*primaryTSBuf = sdata[0];
} else {
columnBytes += (*pField)[PRIMARYKEY_TIMESTAMP_COL_INDEX].len + sizeof(TSCKSUM);
int32_t ret =
loadColumnIntoMem(pQuery, &pRuntimeEnv->vnodeFileInfo, pBlock, *pField, PRIMARYKEY_TIMESTAMP_COL_INDEX, *primaryTSBuf,
tmpBuf, pRuntimeEnv->secondaryUnzipBuffer, pRuntimeEnv->unzipBufSize);
if (ret != 0) {
return -1;
int32_t ret = loadPrimaryTSColumn(pRuntimeEnv, pBlock, pField, &columnBytes);
if (ret != TSDB_CODE_SUCCESS) {
return ret;
}
pSummary->numOfSeek++;
@ -936,7 +985,7 @@ static int32_t loadDataBlockIntoMem(SCompBlock *pBlock, SField **pField, SQueryR
pSummary->loadBlocksUs += (et - st);
pSummary->readDiskBlocks++;
vnodeSetDataBlockInfoLoaded(pRuntimeEnv, pMeterObj, fileIdx);
vnodeSetDataBlockInfoLoaded(pRuntimeEnv, pMeterObj, fileIdx, loadPrimaryCol);
return ret;
}
@ -1848,8 +1897,8 @@ int32_t vnodeGetVnodeHeaderFileIdx(int32_t *fid, SQueryRuntimeEnv *pRuntimeEnv,
return -1;
}
SQueryFilesInfo* pVnodeFiles = &pRuntimeEnv->vnodeFileInfo;
SQueryFilesInfo *pVnodeFiles = &pRuntimeEnv->vnodeFileInfo;
/* set the initial file for current query */
if (order == TSQL_SO_ASC && *fid < pVnodeFiles->pFileInfo[0].fileID) {
*fid = pVnodeFiles->pFileInfo[0].fileID;
@ -2255,7 +2304,7 @@ bool isQueryKilled(SQuery *pQuery) {
* if it will be deleted soon, stop current query ASAP.
*/
SMeterObj *pMeterObj = pQInfo->pObj;
if (vnodeIsMeterState(pMeterObj, TSDB_METER_STATE_DELETING)) {
if (vnodeIsMeterState(pMeterObj, TSDB_METER_STATE_DROPPING)) {
pQInfo->killed = 1;
return true;
}
@ -2967,7 +3016,7 @@ int64_t loadRequiredBlockIntoMem(SQueryRuntimeEnv *pRuntimeEnv, SPositionInfo *p
* currently opened file is not the start file, reset to the start file
*/
int32_t fileIdx = vnodeGetVnodeHeaderFileIdx(&pQuery->fileId, pRuntimeEnv, pQuery->order.order);
if (fileIdx < 0) { // ignore the files on disk
if (fileIdx < 0) { // ignore the files on disk
dError("QInfo:%p failed to get data file:%d", GET_QINFO_ADDR(pQuery), pQuery->fileId);
position->fileId = -1;
return -1;
@ -3061,7 +3110,7 @@ static void vnodeRecordAllFiles(SQInfo *pQInfo, int32_t vnodeId) {
SQueryFilesInfo *pVnodeFilesInfo = &(pQInfo->pMeterQuerySupporter->runtimeEnv.vnodeFileInfo);
pVnodeFilesInfo->vnodeId = vnodeId;
sprintf(pVnodeFilesInfo->dbFilePathPrefix, "%s/vnode%d/db/", tsDirectory, vnodeId);
DIR *pDir = opendir(pVnodeFilesInfo->dbFilePathPrefix);
if (pDir == NULL) {
@ -3116,10 +3165,11 @@ static void vnodeRecordAllFiles(SQInfo *pQInfo, int32_t vnodeId) {
closedir(pDir);
dTrace("QInfo:%p find %d data files in %s to be checked", pQInfo, pVnodeFilesInfo->numOfFiles,
pVnodeFilesInfo->dbFilePathPrefix);
pVnodeFilesInfo->dbFilePathPrefix);
/* order the files information according their names */
qsort(pVnodeFilesInfo->pFileInfo, (size_t)pVnodeFilesInfo->numOfFiles, sizeof(SHeaderFileInfo), file_order_comparator);
qsort(pVnodeFilesInfo->pFileInfo, (size_t)pVnodeFilesInfo->numOfFiles, sizeof(SHeaderFileInfo),
file_order_comparator);
}
static void updateOffsetVal(SQueryRuntimeEnv *pRuntimeEnv, SBlockInfo *pBlockInfo, void *pBlock) {
@ -3602,9 +3652,9 @@ void pointInterpSupporterInit(SQuery *pQuery, SPointInterpoSupporter *pInterpoSu
int32_t offset = 0;
for (int32_t i = 0, j = 0; i < pQuery->numOfCols; ++i, ++j) {
pInterpoSupport->pPrevPoint[j] = prev + offset;
pInterpoSupport->pNextPoint[j] = next + offset;
for (int32_t i = 0; i < pQuery->numOfCols; ++i) {
pInterpoSupport->pPrevPoint[i] = prev + offset;
pInterpoSupport->pNextPoint[i] = next + offset;
offset += pQuery->colList[i].data.bytes;
}
@ -3702,7 +3752,7 @@ int32_t vnodeQuerySingleMeterPrepare(SQInfo *pQInfo, SMeterObj *pMeterObj, SMete
pQuery->lastKey = pQuery->skey;
doInitQueryFileInfoFD(&pSupporter->runtimeEnv.vnodeFileInfo);
vnodeInitDataBlockInfo(&pSupporter->runtimeEnv.loadBlockInfo);
vnodeInitLoadCompBlockInfo(&pSupporter->runtimeEnv.loadCompBlockInfo);
@ -3871,7 +3921,7 @@ int32_t vnodeMultiMeterQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param)
pQuery->pointsRead = 0;
changeExecuteScanOrder(pQuery, true);
doInitQueryFileInfoFD(&pSupporter->runtimeEnv.vnodeFileInfo);
vnodeInitDataBlockInfo(&pSupporter->runtimeEnv.loadBlockInfo);
vnodeInitLoadCompBlockInfo(&pSupporter->runtimeEnv.loadCompBlockInfo);
@ -3933,16 +3983,16 @@ int32_t vnodeMultiMeterQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param)
dError("QInfo:%p failed to create file: %s on disk. %s", pQInfo, pSupporter->extBufFile, strerror(errno));
return TSDB_CODE_SERV_OUT_OF_MEMORY;
}
pSupporter->numOfPages = pSupporter->numOfMeters;
ret = ftruncate(pSupporter->meterOutputFd, pSupporter->numOfPages * DEFAULT_INTERN_BUF_SIZE);
if (ret != TSDB_CODE_SUCCESS) {
dError("QInfo:%p failed to create intermediate result output file:%s. %s", pQInfo, pSupporter->extBufFile,
strerror(errno));
strerror(errno));
return TSDB_CODE_SERV_NO_DISKSPACE;
}
pSupporter->runtimeEnv.numOfRowsPerPage = (DEFAULT_INTERN_BUF_SIZE - sizeof(tFilePage)) / pQuery->rowSize;
pSupporter->lastPageId = -1;
pSupporter->bufSize = pSupporter->numOfPages * DEFAULT_INTERN_BUF_SIZE;
@ -4026,7 +4076,7 @@ TSKEY getTimestampInCacheBlock(SCacheBlock *pBlock, int32_t index) {
/*
* NOTE: pQuery->pos will not change, the corresponding data block will be loaded into buffer
* loadDataBlockOnDemand will change the value of pQuery->pos, according to the pQuery->lastKey
* */
*/
TSKEY getTimestampInDiskBlock(SQueryRuntimeEnv *pRuntimeEnv, int32_t index) {
SQuery *pQuery = pRuntimeEnv->pQuery;
@ -4045,23 +4095,16 @@ TSKEY getTimestampInDiskBlock(SQueryRuntimeEnv *pRuntimeEnv, int32_t index) {
bool loadTimestamp = true;
int32_t fileId = pQuery->fileId;
int32_t fileIndex = vnodeGetVnodeHeaderFileIdx(&fileId, pRuntimeEnv, pQuery->order.order);
if (!vnodeIsDatablockLoaded(pRuntimeEnv, pMeterObj, fileIndex)) {
dTrace("QInfo:%p vid:%d sid:%d id:%s, fileId:%d, slot:%d load data block due to primary key required",
GET_QINFO_ADDR(pQuery), pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->fileId, pQuery->slot);
// todo handle failed to load data, file corrupted
// todo refactor the return value
int32_t ret =
dTrace("QInfo:%p vid:%d sid:%d id:%s, fileId:%d, slot:%d load data block due to primary key required",
GET_QINFO_ADDR(pQuery), pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->fileId, pQuery->slot);
int32_t ret =
loadDataBlockIntoMem(pBlock, &pQuery->pFields[pQuery->slot], pRuntimeEnv, fileIndex, loadTimestamp, true);
UNUSED(ret);
if (ret != TSDB_CODE_SUCCESS) {
return -1;
}
// the fields info is not loaded, load it into memory
if (pQuery->pFields == NULL || pQuery->pFields[pQuery->slot] == NULL) {
loadDataBlockFieldsInfo(pRuntimeEnv, pBlock, &pQuery->pFields[pQuery->slot]);
}
SET_DATA_BLOCK_LOADED(pRuntimeEnv->blockStatus);
SET_FILE_BLOCK_FLAG(pRuntimeEnv->blockStatus);
@ -4757,17 +4800,17 @@ int32_t mergeMetersResultToOneGroups(SMeterQuerySupportObj *pSupporter) {
int32_t end = pSupporter->pSidSet->starterPos[pSupporter->subgroupIdx + 1];
ret = doMergeMetersResultsToGroupRes(pSupporter, pQuery, pRuntimeEnv, pSupporter->pMeterDataInfo, start, end);
if (ret < 0) { // not enough disk space to save the data into disk
if (ret < 0) { // not enough disk space to save the data into disk
return -1;
}
pSupporter->subgroupIdx += 1;
// this group generates at least one result, return results
if (ret > 0) {
break;
}
assert(pSupporter->numOfGroupResultPages == 0);
dTrace("QInfo:%p no result in group %d, continue", GET_QINFO_ADDR(pQuery), pSupporter->subgroupIdx - 1);
}
@ -4784,7 +4827,7 @@ void copyResToQueryResultBuf(SMeterQuerySupportObj *pSupporter, SQuery *pQuery)
// current results of group has been sent to client, try next group
if (mergeMetersResultToOneGroups(pSupporter) != TSDB_CODE_SUCCESS) {
return; // failed to save data in the disk
return; // failed to save data in the disk
}
// set current query completed
@ -4866,7 +4909,7 @@ int32_t doMergeMetersResultsToGroupRes(SMeterQuerySupportObj *pSupporter, SQuery
if (flushFromResultBuf(pSupporter, pQuery, pRuntimeEnv) != TSDB_CODE_SUCCESS) {
return -1;
}
resetMergeResultBuf(pQuery, pCtx);
}
@ -4914,11 +4957,12 @@ int32_t doMergeMetersResultsToGroupRes(SMeterQuerySupportObj *pSupporter, SQuery
if (buffer[0]->numOfElems != 0) { // there are data in buffer
if (flushFromResultBuf(pSupporter, pQuery, pRuntimeEnv) != TSDB_CODE_SUCCESS) {
dError("QInfo:%p failed to flush data into temp file, abort query", GET_QINFO_ADDR(pQuery), pSupporter->extBufFile);
dError("QInfo:%p failed to flush data into temp file, abort query", GET_QINFO_ADDR(pQuery),
pSupporter->extBufFile);
tfree(pTree);
tfree(pValidMeter);
tfree(posArray);
return -1;
}
}
@ -4939,11 +4983,11 @@ int32_t doMergeMetersResultsToGroupRes(SMeterQuerySupportObj *pSupporter, SQuery
return pSupporter->numOfGroupResultPages;
}
static int32_t extendDiskBuf(const SQuery* pQuery, SMeterQuerySupportObj *pSupporter, int32_t numOfPages) {
static int32_t extendDiskBuf(const SQuery *pQuery, SMeterQuerySupportObj *pSupporter, int32_t numOfPages) {
assert(pSupporter->numOfPages * DEFAULT_INTERN_BUF_SIZE == pSupporter->bufSize);
SQInfo* pQInfo = (SQInfo*) GET_QINFO_ADDR(pQuery);
SQInfo *pQInfo = (SQInfo *)GET_QINFO_ADDR(pQuery);
int32_t ret = munmap(pSupporter->meterOutputMMapBuf, pSupporter->bufSize);
pSupporter->numOfPages = numOfPages;
@ -4957,26 +5001,27 @@ static int32_t extendDiskBuf(const SQuery* pQuery, SMeterQuerySupportObj *pSuppo
strerror(errno));
pQInfo->code = -TSDB_CODE_SERV_NO_DISKSPACE;
pQInfo->killed = 1;
return pQInfo->code;
}
pSupporter->bufSize = pSupporter->numOfPages * DEFAULT_INTERN_BUF_SIZE;
pSupporter->meterOutputMMapBuf =
mmap(NULL, pSupporter->bufSize, PROT_READ | PROT_WRITE, MAP_SHARED, pSupporter->meterOutputFd, 0);
if (pSupporter->meterOutputMMapBuf == MAP_FAILED) {
dError("QInfo:%p failed to map temp file: %s. %s", pQInfo, pSupporter->extBufFile, strerror(errno));
pQInfo->code = -TSDB_CODE_SERV_OUT_OF_MEMORY;
pQInfo->killed = 1;
return pQInfo->code;
}
return TSDB_CODE_SUCCESS;
}
int32_t flushFromResultBuf(SMeterQuerySupportObj *pSupporter, const SQuery *pQuery, const SQueryRuntimeEnv *pRuntimeEnv) {
int32_t flushFromResultBuf(SMeterQuerySupportObj *pSupporter, const SQuery *pQuery,
const SQueryRuntimeEnv *pRuntimeEnv) {
int32_t numOfMeterResultBufPages = pSupporter->lastPageId + 1;
int64_t dstSize = numOfMeterResultBufPages * DEFAULT_INTERN_BUF_SIZE +
pSupporter->groupResultSize * (pSupporter->numOfGroupResultPages + 1);
@ -5039,7 +5084,7 @@ int32_t doCloseAllOpenedResults(SMeterQuerySupportObj *pSupporter) {
if (ret != TSDB_CODE_SUCCESS) {
return ret;
}
ret = saveResult(pSupporter, pMeterInfo[i].pMeterQInfo, pMeterInfo[i].pMeterQInfo->lastResRows);
if (ret != TSDB_CODE_SUCCESS) {
return ret;
@ -5047,7 +5092,7 @@ int32_t doCloseAllOpenedResults(SMeterQuerySupportObj *pSupporter) {
}
}
}
return TSDB_CODE_SUCCESS;
}
@ -5355,6 +5400,7 @@ static void doSingleMeterSupplementScan(SQueryRuntimeEnv *pRuntimeEnv) {
// usually this load operation will incure load disk block operation
TSKEY endKey = loadRequiredBlockIntoMem(pRuntimeEnv, &pRuntimeEnv->endPos);
assert((QUERY_IS_ASC_QUERY(pQuery) && endKey <= pQuery->ekey) ||
(!QUERY_IS_ASC_QUERY(pQuery) && endKey >= pQuery->ekey));
@ -5600,18 +5646,19 @@ SMeterDataInfo **vnodeFilterQualifiedMeters(SQInfo *pQInfo, int32_t vid, int32_t
SMeterQuerySupportObj *pSupporter = pQInfo->pMeterQuerySupporter;
SMeterSidExtInfo ** pMeterSidExtInfo = pSupporter->pMeterSidExtInfo;
SQueryRuntimeEnv * pRuntimeEnv = &pSupporter->runtimeEnv;
SVnodeObj *pVnode = &vnodeList[vid];
char * pHeaderFileData = vnodeGetHeaderFileData(pRuntimeEnv, vid, fileIndex);
if (pHeaderFileData == NULL) { // failed to load header file into buffer
char *pHeaderFileData = vnodeGetHeaderFileData(pRuntimeEnv, vid, fileIndex);
if (pHeaderFileData == NULL) { // failed to load header file into buffer
return 0;
}
int32_t tmsize = sizeof(SCompHeader) * (pVnode->cfg.maxSessions) + sizeof(TSCKSUM);
// file is corrupted, abort query in current file
if (validateHeaderOffsetSegment(pQInfo, pRuntimeEnv->vnodeFileInfo.headerFilePath, vid, pHeaderFileData, tmsize) < 0) {
if (validateHeaderOffsetSegment(pQInfo, pRuntimeEnv->vnodeFileInfo.headerFilePath, vid, pHeaderFileData, tmsize) <
0) {
*numOfMeters = 0;
return 0;
}
@ -5754,7 +5801,7 @@ void changeMeterQueryInfoForSuppleQuery(SMeterQueryInfo *pMeterQueryInfo, TSKEY
}
}
static tFilePage *allocNewPage(SQuery* pQuery, SMeterQuerySupportObj *pSupporter, uint32_t *pageId) {
static tFilePage *allocNewPage(SQuery *pQuery, SMeterQuerySupportObj *pSupporter, uint32_t *pageId) {
if (pSupporter->lastPageId == pSupporter->numOfPages - 1) {
if (extendDiskBuf(pQuery, pSupporter, pSupporter->numOfPages + pSupporter->numOfMeters) != TSDB_CODE_SUCCESS) {
return NULL;
@ -5765,9 +5812,10 @@ static tFilePage *allocNewPage(SQuery* pQuery, SMeterQuerySupportObj *pSupporter
return getFilePage(pSupporter, *pageId);
}
tFilePage *addDataPageForMeterQueryInfo(SQuery* pQuery, SMeterQueryInfo *pMeterQueryInfo, SMeterQuerySupportObj *pSupporter) {
uint32_t pageId = 0;
tFilePage *addDataPageForMeterQueryInfo(SQuery *pQuery, SMeterQueryInfo *pMeterQueryInfo,
SMeterQuerySupportObj *pSupporter) {
uint32_t pageId = 0;
tFilePage *pPage = allocNewPage(pQuery, pSupporter, &pageId);
if (pPage == NULL) { // failed to allocate disk-based buffer for intermediate results
return NULL;
@ -5907,7 +5955,7 @@ static bool setCurrentQueryRange(SMeterDataInfo *pMeterDataInfo, SQuery *pQuery,
* @return
*/
uint32_t getDataBlocksForMeters(SMeterQuerySupportObj *pSupporter, SQuery *pQuery, char *pHeaderData,
int32_t numOfMeters, const char* filePath, SMeterDataInfo **pMeterDataInfo) {
int32_t numOfMeters, const char *filePath, SMeterDataInfo **pMeterDataInfo) {
uint32_t numOfBlocks = 0;
SQInfo * pQInfo = (SQInfo *)GET_QINFO_ADDR(pQuery);
SQueryCostSummary *pSummary = &pSupporter->runtimeEnv.summary;
@ -6273,8 +6321,8 @@ int32_t setOutputBufferForIntervalQuery(SMeterQuerySupportObj *pSupporter, SMete
SQueryRuntimeEnv *pRuntimeEnv = &pSupporter->runtimeEnv;
tFilePage * pData = NULL;
SQuery* pQuery = pRuntimeEnv->pQuery;
SQuery *pQuery = pRuntimeEnv->pQuery;
// in the first scan, new space needed for results
if (pMeterQueryInfo->numOfPages == 0) {
pData = addDataPageForMeterQueryInfo(pQuery, pMeterQueryInfo, pSupporter);
@ -6289,7 +6337,7 @@ int32_t setOutputBufferForIntervalQuery(SMeterQuerySupportObj *pSupporter, SMete
}
}
}
if (pData == NULL) {
return -1;
}
@ -6298,12 +6346,12 @@ int32_t setOutputBufferForIntervalQuery(SMeterQuerySupportObj *pSupporter, SMete
pRuntimeEnv->pCtx[i].aOutputBuf = getOutputResPos(pRuntimeEnv, pData, pData->numOfElems, i);
pRuntimeEnv->pCtx[i].resultInfo = &pMeterQueryInfo->resultInfo[i];
}
return TSDB_CODE_SUCCESS;
}
int32_t setIntervalQueryExecutionContext(SMeterQuerySupportObj *pSupporter, int32_t meterIdx,
SMeterQueryInfo *pMeterQueryInfo) {
SMeterQueryInfo *pMeterQueryInfo) {
SQueryRuntimeEnv *pRuntimeEnv = &pSupporter->runtimeEnv;
if (IS_MASTER_SCAN(pRuntimeEnv)) {
@ -6620,8 +6668,8 @@ bool needPrimaryTimestampCol(SQuery *pQuery, SBlockInfo *pBlockInfo) {
int32_t LoadDatablockOnDemand(SCompBlock *pBlock, SField **pFields, uint8_t *blkStatus, SQueryRuntimeEnv *pRuntimeEnv,
int32_t fileIdx, int32_t slotIdx, __block_search_fn_t searchFn, bool onDemand) {
SQuery * pQuery = pRuntimeEnv->pQuery;
SMeterObj * pMeterObj = pRuntimeEnv->pMeterObj;
SQuery * pQuery = pRuntimeEnv->pQuery;
SMeterObj *pMeterObj = pRuntimeEnv->pMeterObj;
TSKEY *primaryKeys = (TSKEY *)pRuntimeEnv->primaryColBuffer->data;
@ -6826,7 +6874,7 @@ int32_t saveResult(SMeterQuerySupportObj *pSupporter, SMeterQueryInfo *pMeterQue
tColModelDisplay(cm, outputPage->data, outputPage->numOfElems, pRuntimeEnv->numOfRowsPerPage);
#endif
}
return TSDB_CODE_SUCCESS;
}
@ -7077,7 +7125,7 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data
SQuery * pQuery = &pQInfo->query;
int tnumOfRows = vnodeList[pObj->vnode].cfg.rowsInFileBlock;
// for metric query, bufIndex always be 0.
for (int32_t col = 0; col < pQuery->numOfOutputCols; ++col) { // pQInfo->bufIndex == 0
int32_t bytes = pQuery->pSelectExpr[col].resBytes;

View File

@ -193,8 +193,6 @@ static SQInfo *vnodeAllocateQInfoCommon(SQueryMeterMsg *pQueryMsg, SMeterObj *pM
} else {
pQuery->colList[i].data.filters = NULL;
}
pQuery->dataRowSize += colList[i].bytes;
}
vnodeUpdateQueryColumnIndex(pQuery, pMeterObj);
@ -1099,10 +1097,12 @@ int32_t vnodeConvertQueryMeterMsg(SQueryMeterMsg *pQueryMsg) {
pSids[0] = (SMeterSidExtInfo *)pMsg;
pSids[0]->sid = htonl(pSids[0]->sid);
pSids[0]->uid = htobe64(pSids[0]->uid);
for (int32_t j = 1; j < pQueryMsg->numOfSids; ++j) {
pSids[j] = (SMeterSidExtInfo *)((char *)pSids[j - 1] + sizeof(SMeterSidExtInfo) + pQueryMsg->tagLength);
pSids[j]->sid = htonl(pSids[j]->sid);
pSids[j]->uid = htobe64(pSids[j]->uid);
}
pMsg = (char *)pSids[pQueryMsg->numOfSids - 1];

View File

@ -28,7 +28,7 @@
#include "vnodeRead.h"
#include "vnodeUtil.h"
#include "vnodeStore.h"
#include "tstatus.h"
#include "vnodeStatus.h"
extern int tsMaxQueues;
@ -295,7 +295,7 @@ int vnodeProcessQueryRequest(char *pMsg, int msgLen, SShellObj *pObj) {
}
if (pQueryMsg->vnode >= TSDB_MAX_VNODES || pQueryMsg->vnode < 0) {
dTrace("qmsg:%p,vid:%d is out of range", pQueryMsg, pQueryMsg->vnode);
dError("qmsg:%p,vid:%d is out of range", pQueryMsg, pQueryMsg->vnode);
code = TSDB_CODE_INVALID_TABLE_ID;
goto _query_over;
}
@ -310,28 +310,27 @@ int vnodeProcessQueryRequest(char *pMsg, int msgLen, SShellObj *pObj) {
}
if (!(pVnode->accessState & TSDB_VN_READ_ACCCESS)) {
dError("qmsg:%p,vid:%d access not allowed", pQueryMsg, pQueryMsg->vnode);
code = TSDB_CODE_NO_READ_ACCESS;
goto _query_over;
}
if (pQueryMsg->pSidExtInfo == 0) {
dTrace("qmsg:%p,SQueryMeterMsg wrong format", pQueryMsg);
code = TSDB_CODE_INVALID_QUERY_MSG;
goto _query_over;
}
if (pVnode->meterList == NULL) {
dError("qmsg:%p,vid:%d has been closed", pQueryMsg, pQueryMsg->vnode);
code = TSDB_CODE_NOT_ACTIVE_VNODE;
goto _query_over;
}
if (pQueryMsg->pSidExtInfo == 0) {
dError("qmsg:%p,SQueryMeterMsg wrong format", pQueryMsg);
code = TSDB_CODE_INVALID_QUERY_MSG;
goto _query_over;
}
pSids = (SMeterSidExtInfo **)pQueryMsg->pSidExtInfo;
for (int32_t i = 0; i < pQueryMsg->numOfSids; ++i) {
if (pSids[i]->sid >= pVnode->cfg.maxSessions || pSids[i]->sid < 0) {
dTrace("qmsg:%p sid:%d is out of range, valid range:[%d,%d]", pQueryMsg, pSids[i]->sid, 0,
pVnode->cfg.maxSessions);
dError("qmsg:%p sid:%d out of range, valid range:[%d,%d]", pQueryMsg, pSids[i]->sid, 0, pVnode->cfg.maxSessions);
code = TSDB_CODE_INVALID_TABLE_ID;
goto _query_over;
}

View File

@ -15,9 +15,9 @@
#include "taosmsg.h"
#include "tsdb.h"
#include "tstatus.h"
#include "vnodeStatus.h"
const char* taosGetVgroupStatusStr(int vgroupStatus) {
const char* taosGetVgroupStatusStr(int32_t vgroupStatus) {
switch (vgroupStatus) {
case TSDB_VG_STATUS_READY: return "ready";
case TSDB_VG_STATUS_IN_PROGRESS: return "inprogress";
@ -28,7 +28,7 @@ const char* taosGetVgroupStatusStr(int vgroupStatus) {
}
}
const char* taosGetDbStatusStr(int dbStatus) {
const char* taosGetDbStatusStr(int32_t dbStatus) {
switch (dbStatus) {
case TSDB_DB_STATUS_READY: return "ready";
case TSDB_DB_STATUS_DROPPING: return "dropping";
@ -37,7 +37,7 @@ const char* taosGetDbStatusStr(int dbStatus) {
}
}
const char* taosGetVnodeStatusStr(int vnodeStatus) {
const char* taosGetVnodeStatusStr(int32_t vnodeStatus) {
switch (vnodeStatus) {
case TSDB_VN_STATUS_OFFLINE: return "offline";
case TSDB_VN_STATUS_CREATING: return "creating";
@ -50,7 +50,7 @@ const char* taosGetVnodeStatusStr(int vnodeStatus) {
}
}
const char* taosGetVnodeSyncStatusStr(int vnodeSyncStatus) {
const char* taosGetVnodeSyncStatusStr(int32_t vnodeSyncStatus) {
switch (vnodeSyncStatus) {
case TSDB_VN_SYNC_STATUS_INIT: return "init";
case TSDB_VN_SYNC_STATUS_SYNCING: return "syncing";
@ -60,7 +60,7 @@ const char* taosGetVnodeSyncStatusStr(int vnodeSyncStatus) {
}
}
const char* taosGetVnodeDropStatusStr(int dropping) {
const char* taosGetVnodeDropStatusStr(int32_t dropping) {
switch (dropping) {
case TSDB_VN_DROP_STATUS_READY: return "ready";
case TSDB_VN_DROP_STATUS_DROPPING: return "dropping";
@ -68,7 +68,7 @@ const char* taosGetVnodeDropStatusStr(int dropping) {
}
}
const char* taosGetDnodeStatusStr(int dnodeStatus) {
const char* taosGetDnodeStatusStr(int32_t dnodeStatus) {
switch (dnodeStatus) {
case TSDB_DN_STATUS_OFFLINE: return "offline";
case TSDB_DN_STATUS_READY: return "ready";
@ -76,7 +76,7 @@ const char* taosGetDnodeStatusStr(int dnodeStatus) {
}
}
const char* taosGetDnodeLbStatusStr(int dnodeBalanceStatus) {
const char* taosGetDnodeLbStatusStr(int32_t dnodeBalanceStatus) {
switch (dnodeBalanceStatus) {
case TSDB_DN_LB_STATUS_BALANCED: return "balanced";
case TSDB_DN_LB_STATUS_BALANCING: return "balancing";
@ -86,7 +86,7 @@ const char* taosGetDnodeLbStatusStr(int dnodeBalanceStatus) {
}
}
const char* taosGetVgroupLbStatusStr(int vglbStatus) {
const char* taosGetVgroupLbStatusStr(int32_t vglbStatus) {
switch (vglbStatus) {
case TSDB_VG_LB_STATUS_READY: return "ready";
case TSDB_VG_LB_STATUS_UPDATE: return "updating";
@ -94,10 +94,22 @@ const char* taosGetVgroupLbStatusStr(int vglbStatus) {
}
}
const char* taosGetVnodeStreamStatusStr(int vnodeStreamStatus) {
const char* taosGetVnodeStreamStatusStr(int32_t vnodeStreamStatus) {
switch (vnodeStreamStatus) {
case TSDB_VN_STREAM_STATUS_START: return "start";
case TSDB_VN_STREAM_STATUS_STOP: return "stop";
default: return "undefined";
}
}
const char* taosGetTableStatusStr(int32_t tableStatus) {
switch(tableStatus) {
case TSDB_METER_STATE_INSERTING: return "inserting";
case TSDB_METER_STATE_IMPORTING:return "importing";
case TSDB_METER_STATE_UPDATING: return "updating";
case TSDB_METER_STATE_DROPPING: return "deleting";
case TSDB_METER_STATE_DROPPED: return "dropped";
case TSDB_METER_STATE_READY: return "ready";
default:return "undefined";
}
}

View File

@ -22,7 +22,7 @@
#include "vnode.h"
#include "vnodeStore.h"
#include "vnodeUtil.h"
#include "tstatus.h"
#include "vnodeStatus.h"
int tsMaxVnode = -1;
int tsOpenVnodes = 0;
@ -118,7 +118,7 @@ static int32_t vnodeMarkAllMetersDropped(SVnodeObj* pVnode) {
} else { // set the meter is to be deleted
SMeterObj* pObj = pVnode->meterList[sid];
if (pObj != NULL) {
pObj->state = TSDB_METER_STATE_DELETED;
pObj->state = TSDB_METER_STATE_DROPPED;
}
}
}

View File

@ -17,7 +17,7 @@
#include "taosmsg.h"
#include "vnode.h"
#include "vnodeUtil.h"
#include "tstatus.h"
#include "vnodeStatus.h"
/* static TAOS *dbConn = NULL; */
void vnodeCloseStreamCallback(void *param);
@ -86,7 +86,7 @@ void vnodeOpenStreams(void *param, void *tmrId) {
for (int sid = 0; sid < pVnode->cfg.maxSessions; ++sid) {
pObj = pVnode->meterList[sid];
if (pObj == NULL || pObj->sqlLen == 0 || vnodeIsMeterState(pObj, TSDB_METER_STATE_DELETING)) continue;
if (pObj == NULL || pObj->sqlLen == 0 || vnodeIsMeterState(pObj, TSDB_METER_STATE_DROPPING)) continue;
dTrace("vid:%d sid:%d id:%s, open stream:%s", pObj->vnode, sid, pObj->meterId, pObj->pSql);

View File

@ -36,8 +36,14 @@ void vnodeCleanUpSystem() {
bool vnodeInitQueryHandle() {
int numOfThreads = tsRatioOfQueryThreads * tsNumOfCores * tsNumOfThreadsPerCore;
if (numOfThreads < 1) numOfThreads = 1;
queryQhandle = taosInitScheduler(tsNumOfVnodesPerCore * tsNumOfCores * tsSessionsPerVnode, numOfThreads, "query");
if (numOfThreads < 1) {
numOfThreads = 1;
}
int32_t maxQueueSize = tsNumOfVnodesPerCore * tsNumOfCores * tsSessionsPerVnode;
dTrace("query task queue initialized, max slot:%d, task threads:%d", maxQueueSize,numOfThreads);
queryQhandle = taosInitSchedulerWithInfo(maxQueueSize, numOfThreads, "query", vnodeTmrCtrl);
return true;
}
@ -52,15 +58,15 @@ bool vnodeInitTmrCtl() {
int vnodeInitSystem() {
if (!vnodeInitQueryHandle()) {
dError("failed to init query qhandle, exit");
return -1;
}
if (!vnodeInitTmrCtl()) {
dError("failed to init timer, exit");
return -1;
}
if (!vnodeInitQueryHandle()) {
dError("failed to init query qhandle, exit");
return -1;
}
if (vnodeInitStore() < 0) {
dError("failed to init vnode storage");

View File

@ -22,6 +22,7 @@
#include "vnode.h"
#include "vnodeDataFilterFunc.h"
#include "vnodeUtil.h"
#include "vnodeStatus.h"
int vnodeCheckFileIntegrity(FILE* fp) {
/*
@ -547,30 +548,38 @@ int32_t vnodeIncQueryRefCount(SQueryMeterMsg* pQueryMsg, SMeterSidExtInfo** pSid
for (int32_t i = 0; i < pQueryMsg->numOfSids; ++i) {
SMeterObj* pMeter = pVnode->meterList[pSids[i]->sid];
if (pMeter == NULL || (pMeter->state > TSDB_METER_STATE_INSERT)) {
if (pMeter == NULL || vnodeIsMeterState(pMeter, TSDB_METER_STATE_DELETING)) {
code = TSDB_CODE_NOT_ACTIVE_TABLE;
dError("qmsg:%p, vid:%d sid:%d, not there or will be dropped", pQueryMsg, pQueryMsg->vnode, pSids[i]->sid);
vnodeSendMeterCfgMsg(pQueryMsg->vnode, pSids[i]->sid);
} else {//update or import
code = TSDB_CODE_ACTION_IN_PROGRESS;
dTrace("qmsg:%p, vid:%d sid:%d id:%s, it is in state:%d, wait!", pQueryMsg, pQueryMsg->vnode, pSids[i]->sid,
pMeter->meterId, pMeter->state);
}
} else {
/*
* vnodeIsSafeToDeleteMeter will wait for this function complete, and then it can
* check if the numOfQueries is 0 or not.
*/
pMeterObjList[(*numOfInc)++] = pMeter;
atomic_fetch_add_32(&pMeter->numOfQueries, 1);
if (pMeter == NULL || vnodeIsMeterState(pMeter, TSDB_METER_STATE_DROPPING)) {
code = TSDB_CODE_NOT_ACTIVE_TABLE;
dError("qmsg:%p, vid:%d sid:%d, not there or will be dropped", pQueryMsg, pQueryMsg->vnode, pSids[i]->sid);
vnodeSendMeterCfgMsg(pQueryMsg->vnode, pSids[i]->sid);
continue;
} else if (pMeter->uid != pSids[i]->uid || pMeter->sid != pSids[i]->sid) {
code = TSDB_CODE_TABLE_ID_MISMATCH;
dError("qmsg:%p, vid:%d sid:%d id:%s uid:%lld, id mismatch. sid:%d uid:%lld in msg", pQueryMsg,
pQueryMsg->vnode, pMeter->sid, pMeter->meterId, pMeter->uid, pSids[i]->sid, pSids[i]->uid);
vnodeSendMeterCfgMsg(pQueryMsg->vnode, pSids[i]->sid);
continue;
} else if (pMeter->state > TSDB_METER_STATE_INSERTING) { //update or import
code = TSDB_CODE_ACTION_IN_PROGRESS;
dTrace("qmsg:%p, vid:%d sid:%d id:%s, it is in state:%s, wait!", pQueryMsg, pQueryMsg->vnode, pSids[i]->sid,
pMeter->meterId, taosGetTableStatusStr(pMeter->state));
continue;
}
/*
* vnodeIsSafeToDeleteMeter will wait for this function complete, and then it can
* check if the numOfQueries is 0 or not.
*/
pMeterObjList[(*numOfInc)++] = pMeter;
atomic_fetch_add_32(&pMeter->numOfQueries, 1);
// output for meter more than one query executed
if (pMeter->numOfQueries > 1) {
dTrace("qmsg:%p, vid:%d sid:%d id:%s, inc query ref, numOfQueries:%d", pQueryMsg, pMeter->vnode, pMeter->sid,
pMeter->meterId, pMeter->numOfQueries);
num++;
}
// output for meter more than one query executed
if (pMeter->numOfQueries > 1) {
dTrace("qmsg:%p, vid:%d sid:%d id:%s, inc query ref, numOfQueries:%d", pQueryMsg, pMeter->vnode, pMeter->sid,
pMeter->meterId, pMeter->numOfQueries);
num++;
}
}
@ -652,7 +661,7 @@ void vnodeClearMeterState(SMeterObj* pMeterObj, int32_t state) {
bool vnodeIsMeterState(SMeterObj* pMeterObj, int32_t state) {
if (state == TSDB_METER_STATE_READY) {
return pMeterObj->state == TSDB_METER_STATE_READY;
} else if (state == TSDB_METER_STATE_DELETING) {
} else if (state == TSDB_METER_STATE_DROPPING) {
return pMeterObj->state >= state;
} else {
return (((pMeterObj->state) & state) == state);
@ -664,7 +673,7 @@ void vnodeSetMeterDeleting(SMeterObj* pMeterObj) {
return;
}
pMeterObj->state |= TSDB_METER_STATE_DELETING;
pMeterObj->state |= TSDB_METER_STATE_DROPPING;
}
int32_t vnodeSetMeterInsertImportStateEx(SMeterObj* pObj, int32_t st) {
@ -672,7 +681,7 @@ int32_t vnodeSetMeterInsertImportStateEx(SMeterObj* pObj, int32_t st) {
int32_t state = vnodeSetMeterState(pObj, st);
if (state != TSDB_METER_STATE_READY) {//return to denote import is not performed
if (vnodeIsMeterState(pObj, TSDB_METER_STATE_DELETING)) {
if (vnodeIsMeterState(pObj, TSDB_METER_STATE_DROPPING)) {
dTrace("vid:%d sid:%d id:%s, meter is deleted, state:%d", pObj->vnode, pObj->sid, pObj->meterId,
pObj->state);
code = TSDB_CODE_NOT_ACTIVE_TABLE;
@ -690,17 +699,17 @@ int32_t vnodeSetMeterInsertImportStateEx(SMeterObj* pObj, int32_t st) {
bool vnodeIsSafeToDeleteMeter(SVnodeObj* pVnode, int32_t sid) {
SMeterObj* pObj = pVnode->meterList[sid];
if (pObj == NULL || vnodeIsMeterState(pObj, TSDB_METER_STATE_DELETED)) {
if (pObj == NULL || vnodeIsMeterState(pObj, TSDB_METER_STATE_DROPPED)) {
return true;
}
int32_t prev = vnodeSetMeterState(pObj, TSDB_METER_STATE_DELETING);
int32_t prev = vnodeSetMeterState(pObj, TSDB_METER_STATE_DROPPING);
/*
* if the meter is not in ready/deleting state, it must be in insert/import/update,
* set the deleting state and wait the procedure to be completed
*/
if (prev != TSDB_METER_STATE_READY && prev < TSDB_METER_STATE_DELETING) {
if (prev != TSDB_METER_STATE_READY && prev < TSDB_METER_STATE_DROPPING) {
vnodeSetMeterDeleting(pObj);
dWarn("vid:%d sid:%d id:%s, can not be deleted, state:%d, wait", pObj->vnode, pObj->sid, pObj->meterId, prev);
@ -710,7 +719,7 @@ bool vnodeIsSafeToDeleteMeter(SVnodeObj* pVnode, int32_t sid) {
bool ready = true;
/*
* the query will be stopped ASAP, since the state of meter is set to TSDB_METER_STATE_DELETING,
* the query will be stopped ASAP, since the state of meter is set to TSDB_METER_STATE_DROPPING,
* and new query will abort since the meter is deleted.
*/
pthread_mutex_lock(&pVnode->vmutex);

View File

@ -15,7 +15,7 @@
#define _DEFAULT_SOURCE
#include "mgmtBalance.h"
#include "tstatus.h"
#include "vnodeStatus.h"
void mgmtStartBalanceTimer(int64_t mseconds) {}

View File

@ -15,7 +15,7 @@
#define _DEFAULT_SOURCE
#include "mgmt.h"
#include "tstatus.h"
#include "vnodeStatus.h"
SDnodeObj dnodeObj;
extern uint32_t tsRebootTime;

View File

@ -23,7 +23,7 @@
#include "tutil.h"
#include "vnode.h"
#include "tsystem.h"
#include "tstatus.h"
#include "vnodeStatus.h"
extern void *dmQhandle;
void * mgmtStatusTimer = NULL;

View File

@ -15,7 +15,7 @@
#define _DEFAULT_SOURCE
#include "vnode.h"
#include "tstatus.h"
#include "vnodeStatus.h"
int vnodeInitPeer(int numOfThreads) { return 0; }

View File

@ -16,6 +16,9 @@
#include "os.h"
#include "tlog.h"
#include "tsched.h"
#include "ttimer.h"
#define DUMP_SCHEDULER_TIME_WINDOW 30000 //every 30sec, take a snap shot of task queue.
typedef struct {
char label[16];
@ -28,10 +31,13 @@ typedef struct {
int numOfThreads;
pthread_t * qthread;
SSchedMsg * queue;
void* pTmrCtrl;
void* pTimer;
} SSchedQueue;
void *taosProcessSchedQueue(void *param);
void taosCleanUpScheduler(void *param);
static void *taosProcessSchedQueue(void *param);
static void taosDumpSchedulerStatus(void *qhandle, void *tmrId);
void *taosInitScheduler(int queueSize, int numOfThreads, const char *label) {
pthread_attr_t attr;
@ -96,6 +102,17 @@ _error:
return NULL;
}
void *taosInitSchedulerWithInfo(int queueSize, int numOfThreads, const char *label, void *tmrCtrl) {
SSchedQueue* pSched = taosInitScheduler(queueSize, numOfThreads, label);
if (tmrCtrl != NULL && pSched != NULL) {
pSched->pTmrCtrl = tmrCtrl;
taosTmrReset(taosDumpSchedulerStatus, DUMP_SCHEDULER_TIME_WINDOW, pSched, pSched->pTmrCtrl, &pSched->pTimer);
}
return pSched;
}
void *taosProcessSchedQueue(void *param) {
SSchedMsg msg;
SSchedQueue *pSched = (SSchedQueue *)param;
@ -173,8 +190,27 @@ void taosCleanUpScheduler(void *param) {
tsem_destroy(&pSched->emptySem);
tsem_destroy(&pSched->fullSem);
pthread_mutex_destroy(&pSched->queueMutex);
if (pSched->pTimer) {
taosTmrStopA(&pSched->pTimer);
}
free(pSched->queue);
free(pSched->qthread);
free(pSched); // fix memory leak
}
// for debug purpose, dump the scheduler status every 1min.
void taosDumpSchedulerStatus(void *qhandle, void *tmrId) {
SSchedQueue *pSched = (SSchedQueue *)qhandle;
if (pSched == NULL || pSched->pTimer == NULL || pSched->pTimer != tmrId) {
return;
}
int32_t size = ((pSched->emptySlot - pSched->fullSlot) + pSched->queueSize) % pSched->queueSize;
if (size > 0) {
pTrace("scheduler:%s, current tasks in queue:%d, task thread:%d", pSched->label, size, pSched->numOfThreads);
}
taosTmrReset(taosDumpSchedulerStatus, DUMP_SCHEDULER_TIME_WINDOW, pSched, pSched->pTmrCtrl, &pSched->pTimer);
}