Merge pull request #849 from taosdata/feature/liaohj

Feature/liaohj
This commit is contained in:
slguan 2019-12-04 18:44:50 +08:00 committed by GitHub
commit b908772583
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 274 additions and 273 deletions

View File

@ -100,6 +100,9 @@
# default system charset # default system charset
# charset UTF-8 # charset UTF-8
# system time zone
# timezone Asia/Shanghai (CST, +0800)
# enable/disable commit log # enable/disable commit log
# clog 1 # clog 1

View File

@ -107,14 +107,6 @@ void tscAddSpecialColumnForSelect(SSqlCmd* pCmd, int32_t outputColIndex, int16_t
void addRequiredTagColumn(SSqlCmd* pCmd, int32_t tagColIndex, int32_t tableIndex); void addRequiredTagColumn(SSqlCmd* pCmd, int32_t tagColIndex, int32_t tableIndex);
//TODO refactor, remove
void SStringFree(SString* str);
void SStringCopy(SString* pDest, const SString* pSrc);
SString SStringCreate(const char* str);
int32_t SStringAlloc(SString* pStr, int32_t size);
int32_t SStringEnsureRemain(SString* pStr, int32_t size);
int32_t setMeterID(SSqlObj* pSql, SSQLToken* pzTableName, int32_t tableIndex); int32_t setMeterID(SSqlObj* pSql, SSQLToken* pzTableName, int32_t tableIndex);
void tscClearInterpInfo(SSqlCmd* pCmd); void tscClearInterpInfo(SSqlCmd* pCmd);
@ -226,7 +218,7 @@ void addGroupInfoForSubquery(SSqlObj* pParentObj, SSqlObj* pSql, int32_t tableIn
void doAddGroupColumnForSubquery(SSqlCmd* pCmd, int32_t tagIndex); void doAddGroupColumnForSubquery(SSqlCmd* pCmd, int32_t tagIndex);
int16_t tscGetJoinTagColIndexByUid(SSqlCmd* pCmd, uint64_t uid); int16_t tscGetJoinTagColIndexByUid(STagCond* pTagCond, uint64_t uid);
TAOS* taos_connect_a(char* ip, char* user, char* pass, char* db, uint16_t port, void (*fp)(void*, TAOS_RES*, int), TAOS* taos_connect_a(char* ip, char* user, char* pass, char* db, uint16_t port, void (*fp)(void*, TAOS_RES*, int),
void* param, void** taos); void* param, void** taos);

View File

@ -188,7 +188,7 @@ typedef struct SString {
typedef struct SCond { typedef struct SCond {
uint64_t uid; uint64_t uid;
SString cond; char* cond;
} SCond; } SCond;
typedef struct SJoinNode { typedef struct SJoinNode {

View File

@ -51,7 +51,7 @@ void taos_query_a(TAOS *taos, const char *sqlstr, void (*fp)(void *, TAOS_RES *,
} }
int32_t sqlLen = strlen(sqlstr); int32_t sqlLen = strlen(sqlstr);
if (sqlLen > TSDB_MAX_SQL_LEN) { if (sqlLen > tsMaxSQLStringLen) {
tscError("sql string too long"); tscError("sql string too long");
tscQueueAsyncError(fp, param); tscQueueAsyncError(fp, param);
return; return;

View File

@ -307,7 +307,7 @@ int32_t tscLaunchSecondSubquery(SSqlObj* pSql) {
SSqlExpr* pExpr = tscSqlExprGet(&pNew->cmd, 0); SSqlExpr* pExpr = tscSqlExprGet(&pNew->cmd, 0);
assert(pNew->cmd.tagCond.joinInfo.hasJoin); assert(pNew->cmd.tagCond.joinInfo.hasJoin);
int16_t tagColIndex = tscGetJoinTagColIndexByUid(&pNew->cmd, pMeterMetaInfo->pMeterMeta->uid); int16_t tagColIndex = tscGetJoinTagColIndexByUid(&pNew->cmd.tagCond, pMeterMetaInfo->pMeterMeta->uid);
pExpr->param[0].i64Key = tagColIndex; pExpr->param[0].i64Key = tagColIndex;
pExpr->numOfParams = 1; pExpr->numOfParams = 1;

View File

@ -21,6 +21,7 @@
#include "taosmsg.h" #include "taosmsg.h"
#include "tstoken.h" #include "tstoken.h"
#include "ttime.h" #include "ttime.h"
#include "tstrbuild.h"
#include "tscUtil.h" #include "tscUtil.h"
#include "tschemautil.h" #include "tschemautil.h"
@ -3103,26 +3104,23 @@ static int32_t optrToString(tSQLExpr* pExpr, char** exprString) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t tablenameListToString(tSQLExpr* pExpr, char* str) { static int32_t tablenameListToString(tSQLExpr* pExpr, /*char* str*/SStringBuilder* sb) {
tSQLExprList* pList = pExpr->pParam; tSQLExprList* pList = pExpr->pParam;
if (pList->nExpr <= 0) { if (pList->nExpr <= 0) {
return TSDB_CODE_INVALID_SQL; return TSDB_CODE_INVALID_SQL;
} }
if (pList->nExpr > 0) { if (pList->nExpr > 0) {
strcpy(str, QUERY_COND_REL_PREFIX_IN); taosStringBuilderAppendStringLen(sb, QUERY_COND_REL_PREFIX_IN, QUERY_COND_REL_PREFIX_IN_LEN);
str += QUERY_COND_REL_PREFIX_IN_LEN;
} }
int32_t len = 0; int32_t len = 0;
for (int32_t i = 0; i < pList->nExpr; ++i) { for (int32_t i = 0; i < pList->nExpr; ++i) {
tSQLExpr* pSub = pList->a[i].pNode; tSQLExpr* pSub = pList->a[i].pNode;
strncpy(str + len, pSub->val.pz, pSub->val.nLen); taosStringBuilderAppendStringLen(sb, pSub->val.pz, pSub->val.nLen);
len += pSub->val.nLen;
if (i < pList->nExpr - 1) { if (i < pList->nExpr - 1) {
str[len++] = TBNAME_LIST_SEP[0]; taosStringBuilderAppendString(sb, TBNAME_LIST_SEP);
} }
if (pSub->val.nLen <= 0 || pSub->val.nLen > TSDB_METER_NAME_LEN) { if (pSub->val.nLen <= 0 || pSub->val.nLen > TSDB_METER_NAME_LEN) {
@ -3133,11 +3131,9 @@ static int32_t tablenameListToString(tSQLExpr* pExpr, char* str) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t tablenameCondToString(tSQLExpr* pExpr, char* str) { static int32_t tablenameCondToString(tSQLExpr* pExpr, /*char* str*/SStringBuilder* sb) {
strcpy(str, QUERY_COND_REL_PREFIX_LIKE); taosStringBuilderAppendStringLen(sb, QUERY_COND_REL_PREFIX_LIKE, QUERY_COND_REL_PREFIX_LIKE_LEN);
str += strlen(QUERY_COND_REL_PREFIX_LIKE); taosStringBuilderAppendString(sb, pExpr->val.pz);
strcpy(str, pExpr->val.pz);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -3241,7 +3237,7 @@ static int32_t getTagCondString(SSqlCmd* pCmd, tSQLExpr* pExpr, char** str) {
return tSQLExprLeafToString(pExpr, true, str); return tSQLExprLeafToString(pExpr, true, str);
} }
static int32_t getTablenameCond(SSqlCmd* pCmd, tSQLExpr* pTableCond, char* str) { static int32_t getTablenameCond(SSqlCmd* pCmd, tSQLExpr* pTableCond, /*char* str*/SStringBuilder* sb) {
const char* msg0 = "invalid table name list"; const char* msg0 = "invalid table name list";
if (pTableCond == NULL) { if (pTableCond == NULL) {
@ -3258,9 +3254,9 @@ static int32_t getTablenameCond(SSqlCmd* pCmd, tSQLExpr* pTableCond, char* str)
int32_t ret = TSDB_CODE_SUCCESS; int32_t ret = TSDB_CODE_SUCCESS;
if (pTableCond->nSQLOptr == TK_IN) { if (pTableCond->nSQLOptr == TK_IN) {
ret = tablenameListToString(pRight, str); ret = tablenameListToString(pRight, sb);
} else if (pTableCond->nSQLOptr == TK_LIKE) { } else if (pTableCond->nSQLOptr == TK_LIKE) {
ret = tablenameCondToString(pRight, str); ret = tablenameCondToString(pRight, sb);
} }
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
@ -3828,8 +3824,7 @@ int tableNameCompar(const void* lhs, const void* rhs) {
return ret > 0 ? 1 : -1; return ret > 0 ? 1 : -1;
} }
static int32_t setTableCondForMetricQuery(SSqlObj* pSql, tSQLExpr* pExpr, int16_t tableCondIndex, static int32_t setTableCondForMetricQuery(SSqlObj* pSql, tSQLExpr* pExpr, int16_t tableCondIndex, SStringBuilder* sb) {
char* tmpTableCondBuf) {
SSqlCmd* pCmd = &pSql->cmd; SSqlCmd* pCmd = &pSql->cmd;
const char* msg = "meter name too long"; const char* msg = "meter name too long";
@ -3842,26 +3837,25 @@ static int32_t setTableCondForMetricQuery(SSqlObj* pSql, tSQLExpr* pExpr, int16_
STagCond* pTagCond = &pSql->cmd.tagCond; STagCond* pTagCond = &pSql->cmd.tagCond;
pTagCond->tbnameCond.uid = pMeterMetaInfo->pMeterMeta->uid; pTagCond->tbnameCond.uid = pMeterMetaInfo->pMeterMeta->uid;
SString* pTableCond = &pCmd->tagCond.tbnameCond.cond;
SStringAlloc(pTableCond, 4096);
assert(pExpr->nSQLOptr == TK_LIKE || pExpr->nSQLOptr == TK_IN); assert(pExpr->nSQLOptr == TK_LIKE || pExpr->nSQLOptr == TK_IN);
if (pExpr->nSQLOptr == TK_LIKE) { if (pExpr->nSQLOptr == TK_LIKE) {
strcpy(pTableCond->z, tmpTableCondBuf); char* str = taosStringBuilderGetResult(sb, NULL);
pTableCond->n = strlen(pTableCond->z); pCmd->tagCond.tbnameCond.cond = strdup(str);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
strcpy(pTableCond->z, QUERY_COND_REL_PREFIX_IN); SStringBuilder sb1 = {0};
pTableCond->n += strlen(QUERY_COND_REL_PREFIX_IN); taosStringBuilderAppendStringLen(&sb1, QUERY_COND_REL_PREFIX_IN, QUERY_COND_REL_PREFIX_IN_LEN);
char db[TSDB_METER_ID_LEN] = {0}; char db[TSDB_METER_ID_LEN] = {0};
// remove the duplicated input table names // remove the duplicated input table names
int32_t num = 0; int32_t num = 0;
char** segments = strsplit(tmpTableCondBuf + QUERY_COND_REL_PREFIX_IN_LEN, TBNAME_LIST_SEP, &num); char* tableNameString = taosStringBuilderGetResult(sb, NULL);
qsort(segments, num, sizeof(void*), tableNameCompar);
char** segments = strsplit(tableNameString + QUERY_COND_REL_PREFIX_IN_LEN, TBNAME_LIST_SEP, &num);
qsort(segments, num, POINTER_BYTES, tableNameCompar);
int32_t j = 1; int32_t j = 1;
for (int32_t i = 1; i < num; ++i) { for (int32_t i = 1; i < num; ++i) {
@ -3875,25 +3869,30 @@ static int32_t setTableCondForMetricQuery(SSqlObj* pSql, tSQLExpr* pExpr, int16_
char* acc = getAccountId(pSql); char* acc = getAccountId(pSql);
for (int32_t i = 0; i < num; ++i) { for (int32_t i = 0; i < num; ++i) {
SStringEnsureRemain(pTableCond, TSDB_METER_ID_LEN);
if (i >= 1) { if (i >= 1) {
pTableCond->z[pTableCond->n++] = TBNAME_LIST_SEP[0]; taosStringBuilderAppendStringLen(&sb1, TBNAME_LIST_SEP, 1);
} }
char idBuf[TSDB_METER_ID_LEN + 1] = {0};
int32_t xlen = strlen(segments[i]); int32_t xlen = strlen(segments[i]);
SSQLToken t = {.z = segments[i], .n = xlen, .type = TK_STRING}; SSQLToken t = {.z = segments[i], .n = xlen, .type = TK_STRING};
int32_t ret = setObjFullName(pTableCond->z + pTableCond->n, acc, &dbToken, &t, &xlen); int32_t ret = setObjFullName(idBuf, acc, &dbToken, &t, &xlen);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
taosStringBuilderDestroy(&sb1);
tfree(segments); tfree(segments);
invalidSqlErrMsg(pCmd, msg); invalidSqlErrMsg(pCmd, msg);
return ret; return ret;
} }
pTableCond->n += xlen; taosStringBuilderAppendString(&sb1, idBuf);
} }
char* str = taosStringBuilderGetResult(&sb1, NULL);
pCmd->tagCond.tbnameCond.cond = strdup(str);
taosStringBuilderDestroy(&sb1);
tfree(segments); tfree(segments);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -4071,10 +4070,9 @@ int32_t doParseWhereClause(SSqlObj* pSql, tSQLExpr** pExpr, SCondExpr* condExpr)
SSqlCmd* pCmd = &pSql->cmd; SSqlCmd* pCmd = &pSql->cmd;
/* /*
* tags query condition may be larger than 512bytes, * tags query condition may be larger than 512bytes, therefore, we need to prepare enough large space
* therefore, we need to prepare enough large space
*/ */
char tableNameCond[TSDB_MAX_SQL_LEN] = {0}; SStringBuilder sb = {0};
int32_t ret = TSDB_CODE_SUCCESS; int32_t ret = TSDB_CODE_SUCCESS;
if ((ret = getQueryCondExpr(pCmd, pExpr, condExpr, &type, (*pExpr)->nSQLOptr)) != TSDB_CODE_SUCCESS) { if ((ret = getQueryCondExpr(pCmd, pExpr, condExpr, &type, (*pExpr)->nSQLOptr)) != TSDB_CODE_SUCCESS) {
@ -4119,7 +4117,7 @@ int32_t doParseWhereClause(SSqlObj* pSql, tSQLExpr** pExpr, SCondExpr* condExpr)
} }
// 4. get the table name query condition // 4. get the table name query condition
if ((ret = getTablenameCond(pCmd, condExpr->pTableCond, tableNameCond)) != TSDB_CODE_SUCCESS) { if ((ret = getTablenameCond(pCmd, condExpr->pTableCond, &sb)) != TSDB_CODE_SUCCESS) {
return ret; return ret;
} }
@ -4135,7 +4133,10 @@ int32_t doParseWhereClause(SSqlObj* pSql, tSQLExpr** pExpr, SCondExpr* condExpr)
// 7. query condition for table name // 7. query condition for table name
pCmd->tagCond.relType = (condExpr->relType == TK_AND) ? TSDB_RELATION_AND : TSDB_RELATION_OR; pCmd->tagCond.relType = (condExpr->relType == TK_AND) ? TSDB_RELATION_AND : TSDB_RELATION_OR;
ret = setTableCondForMetricQuery(pSql, condExpr->pTableCond, condExpr->tableCondIndex, tableNameCond);
ret = setTableCondForMetricQuery(pSql, condExpr->pTableCond, condExpr->tableCondIndex, &sb);
taosStringBuilderDestroy(&sb);
if (!validateFilterExpr(pCmd)) { if (!validateFilterExpr(pCmd)) {
return invalidSqlErrMsg(pCmd, msg); return invalidSqlErrMsg(pCmd, msg);
} }
@ -5156,7 +5157,7 @@ void addGroupInfoForSubquery(SSqlObj* pParentObj, SSqlObj* pSql, int32_t tableIn
if (pExpr->functionId != TSDB_FUNC_TAG) { if (pExpr->functionId != TSDB_FUNC_TAG) {
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0); SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0);
int16_t columnInfo = tscGetJoinTagColIndexByUid(pCmd, pMeterMetaInfo->pMeterMeta->uid); int16_t columnInfo = tscGetJoinTagColIndexByUid(&pCmd->tagCond, pMeterMetaInfo->pMeterMeta->uid);
SColumnIndex index = {.tableIndex = 0, .columnIndex = columnInfo}; SColumnIndex index = {.tableIndex = 0, .columnIndex = columnInfo};
SSchema* pSchema = tsGetTagSchema(pMeterMetaInfo->pMeterMeta); SSchema* pSchema = tsGetTagSchema(pMeterMetaInfo->pMeterMeta);

View File

@ -689,7 +689,7 @@ int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, int16_t vnodeId
SSqlExpr *pExpr = tscSqlExprGet(&pNew->cmd, 0); SSqlExpr *pExpr = tscSqlExprGet(&pNew->cmd, 0);
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(&pNew->cmd, 0); SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(&pNew->cmd, 0);
int16_t tagColIndex = tscGetJoinTagColIndexByUid(&pNew->cmd, pMeterMetaInfo->pMeterMeta->uid); int16_t tagColIndex = tscGetJoinTagColIndexByUid(&pSupporter->tagCond, pMeterMetaInfo->pMeterMeta->uid);
pExpr->param->i64Key = tagColIndex; pExpr->param->i64Key = tagColIndex;
pExpr->numOfParams = 1; pExpr->numOfParams = 1;
@ -2741,10 +2741,14 @@ static int32_t tscEstimateMetricMetaMsgSize(SSqlCmd *pCmd) {
int32_t n = 0; int32_t n = 0;
for (int32_t i = 0; i < pCmd->tagCond.numOfTagCond; ++i) { for (int32_t i = 0; i < pCmd->tagCond.numOfTagCond; ++i) {
n += pCmd->tagCond.cond[i].cond.n; n += strlen(pCmd->tagCond.cond[i].cond);
} }
int32_t tagLen = n * TSDB_NCHAR_SIZE + pCmd->tagCond.tbnameCond.cond.n * TSDB_NCHAR_SIZE; int32_t tagLen = n * TSDB_NCHAR_SIZE;
if (pCmd->tagCond.tbnameCond.cond != NULL) {
tagLen += strlen(pCmd->tagCond.tbnameCond.cond) * TSDB_NCHAR_SIZE;
}
int32_t joinCondLen = (TSDB_METER_ID_LEN + sizeof(int16_t)) * 2; int32_t joinCondLen = (TSDB_METER_ID_LEN + sizeof(int16_t)) * 2;
int32_t elemSize = sizeof(SMetricMetaElemMsg) * pCmd->numOfTables; int32_t elemSize = sizeof(SMetricMetaElemMsg) * pCmd->numOfTables;
@ -2816,8 +2820,9 @@ int tscBuildMetricMetaMsg(SSqlObj *pSql) {
if (pTagCond->numOfTagCond > 0) { if (pTagCond->numOfTagCond > 0) {
SCond *pCond = tsGetMetricQueryCondPos(pTagCond, uid); SCond *pCond = tsGetMetricQueryCondPos(pTagCond, uid);
if (pCond != NULL) { if (pCond != NULL) {
condLen = pCond->cond.n + 1; condLen = strlen(pCond->cond) + 1;
bool ret = taosMbsToUcs4(pCond->cond.z, pCond->cond.n, pMsg, pCond->cond.n * TSDB_NCHAR_SIZE);
bool ret = taosMbsToUcs4(pCond->cond, condLen, pMsg, condLen * TSDB_NCHAR_SIZE);
if (!ret) { if (!ret) {
tscError("%p mbs to ucs4 failed:%s", pSql, tsGetMetricQueryCondPos(pTagCond, uid)); tscError("%p mbs to ucs4 failed:%s", pSql, tsGetMetricQueryCondPos(pTagCond, uid));
return 0; return 0;
@ -2836,15 +2841,17 @@ int tscBuildMetricMetaMsg(SSqlObj *pSql) {
offset = pMsg - (char *)pMetaMsg; offset = pMsg - (char *)pMetaMsg;
pElem->tableCond = htonl(offset); pElem->tableCond = htonl(offset);
pElem->tableCondLen = htonl(pTagCond->tbnameCond.cond.n);
uint32_t len = strlen(pTagCond->tbnameCond.cond);
pElem->tableCondLen = htonl(len);
memcpy(pMsg, pTagCond->tbnameCond.cond.z, pTagCond->tbnameCond.cond.n); memcpy(pMsg, pTagCond->tbnameCond.cond, len);
pMsg += pTagCond->tbnameCond.cond.n; pMsg += len;
} }
SSqlGroupbyExpr *pGroupby = &pCmd->groupbyExpr; SSqlGroupbyExpr *pGroupby = &pCmd->groupbyExpr;
if (pGroupby->tableIndex != i) { if (pGroupby->tableIndex != i && pGroupby->numOfGroupCols > 0) {
pElem->orderType = 0; pElem->orderType = 0;
pElem->orderIndex = 0; pElem->orderIndex = 0;
pElem->numOfGroupCols = 0; pElem->numOfGroupCols = 0;

View File

@ -270,7 +270,7 @@ int taos_query(TAOS *taos, const char *sqlstr) {
SSqlRes *pRes = &pSql->res; SSqlRes *pRes = &pSql->res;
size_t sqlLen = strlen(sqlstr); size_t sqlLen = strlen(sqlstr);
if (sqlLen > TSDB_MAX_SQL_LEN) { if (sqlLen > tsMaxSQLStringLen) {
pRes->code = tscInvalidSQLErrMsg(pSql->cmd.payload, "sql too long", NULL); // set the additional error msg for invalid sql pRes->code = tscInvalidSQLErrMsg(pSql->cmd.payload, "sql too long", NULL); // set the additional error msg for invalid sql
tscError("%p SQL result:%d, %s pObj:%p", pSql, pRes->code, taos_errstr(taos), pObj); tscError("%p SQL result:%d, %s pObj:%p", pSql, pRes->code, taos_errstr(taos), pObj);
@ -786,7 +786,6 @@ int taos_errno(TAOS *taos) {
char *taos_errstr(TAOS *taos) { char *taos_errstr(TAOS *taos) {
STscObj *pObj = (STscObj *)taos; STscObj *pObj = (STscObj *)taos;
uint8_t code; uint8_t code;
// char temp[256] = {0};
if (pObj == NULL || pObj->signature != pObj) return tsError[globalCode]; if (pObj == NULL || pObj->signature != pObj) return tsError[globalCode];
@ -797,11 +796,13 @@ char *taos_errstr(TAOS *taos) {
// for invalid sql, additional information is attached to explain why the sql is invalid // for invalid sql, additional information is attached to explain why the sql is invalid
if (code == TSDB_CODE_INVALID_SQL) { if (code == TSDB_CODE_INVALID_SQL) {
// snprintf(temp, tListLen(temp), "invalid SQL: %s", pObj->pSql->cmd.payload);
// strcpy(pObj->pSql->cmd.payload, temp);
return pObj->pSql->cmd.payload; return pObj->pSql->cmd.payload;
} else { } else {
return tsError[code]; if (code < 0 || code > TSDB_CODE_MAX_ERROR_CODE) {
return tsError[TSDB_CODE_SUCCESS];
} else {
return tsError[code];
}
} }
} }
@ -924,7 +925,7 @@ int taos_validate_sql(TAOS *taos, const char *sql) {
tscTrace("%p Valid SQL: %s pObj:%p", pSql, sql, pObj); tscTrace("%p Valid SQL: %s pObj:%p", pSql, sql, pObj);
int32_t sqlLen = strlen(sql); int32_t sqlLen = strlen(sql);
if (sqlLen > TSDB_MAX_SQL_LEN) { if (sqlLen > tsMaxSQLStringLen) {
tscError("%p sql too long", pSql); tscError("%p sql too long", pSql);
pRes->code = TSDB_CODE_INVALID_SQL; pRes->code = TSDB_CODE_INVALID_SQL;
return pRes->code; return pRes->code;

View File

@ -51,7 +51,6 @@ void tscGetMetricMetaCacheKey(SSqlCmd* pCmd, char* str, uint64_t uid) {
assert(len < tListLen(tagIdBuf)); assert(len < tListLen(tagIdBuf));
const int32_t maxKeySize = TSDB_MAX_TAGS_LEN; // allowed max key size const int32_t maxKeySize = TSDB_MAX_TAGS_LEN; // allowed max key size
char* tmp = calloc(1, TSDB_MAX_SQL_LEN);
SCond* cond = tsGetMetricQueryCondPos(pTagCond, uid); SCond* cond = tsGetMetricQueryCondPos(pTagCond, uid);
@ -60,12 +59,24 @@ void tscGetMetricMetaCacheKey(SSqlCmd* pCmd, char* str, uint64_t uid) {
sprintf(join, "%s,%s", pTagCond->joinInfo.left.meterId, pTagCond->joinInfo.right.meterId); sprintf(join, "%s,%s", pTagCond->joinInfo.left.meterId, pTagCond->joinInfo.right.meterId);
} }
int32_t keyLen = // estimate the buffer size
snprintf(tmp, TSDB_MAX_SQL_LEN, "%s,%s,%s,%d,%s,[%s],%d", pMeterMetaInfo->name, size_t tbnameCondLen = pTagCond->tbnameCond.cond != NULL? strlen(pTagCond->tbnameCond.cond):0;
(cond != NULL ? cond->cond.z : NULL), pTagCond->tbnameCond.cond.n > 0 ? pTagCond->tbnameCond.cond.z : NULL, size_t redundantLen = 20;
size_t bufSize = strlen(pMeterMetaInfo->name) + tbnameCondLen + strlen(join) + strlen(tagIdBuf);
if (cond != NULL) {
bufSize += strlen(cond->cond);
}
bufSize = (size_t) ((bufSize + redundantLen) * 1.5);
char* tmp = calloc(1, bufSize);
int32_t keyLen = snprintf(tmp, bufSize, "%s,%s,%s,%d,%s,[%s],%d", pMeterMetaInfo->name,
(cond != NULL ? cond->cond : NULL),
(tbnameCondLen > 0 ? pTagCond->tbnameCond.cond : NULL),
pTagCond->relType, join, tagIdBuf, pCmd->groupbyExpr.orderType); pTagCond->relType, join, tagIdBuf, pCmd->groupbyExpr.orderType);
assert(keyLen <= TSDB_MAX_SQL_LEN); assert(keyLen <= bufSize);
if (keyLen < maxKeySize) { if (keyLen < maxKeySize) {
strcpy(str, tmp); strcpy(str, tmp);
@ -99,7 +110,7 @@ void tsSetMetricQueryCond(STagCond* pTagCond, uint64_t uid, const char* str) {
SCond* pDest = &pTagCond->cond[pTagCond->numOfTagCond]; SCond* pDest = &pTagCond->cond[pTagCond->numOfTagCond];
pDest->uid = uid; pDest->uid = uid;
pDest->cond = SStringCreate(str); pDest->cond = strdup(str);
pTagCond->numOfTagCond += 1; pTagCond->numOfTagCond += 1;
} }
@ -1340,14 +1351,20 @@ bool tscValidateColumnId(SSqlCmd* pCmd, int32_t colId) {
void tscTagCondCopy(STagCond* dest, const STagCond* src) { void tscTagCondCopy(STagCond* dest, const STagCond* src) {
memset(dest, 0, sizeof(STagCond)); memset(dest, 0, sizeof(STagCond));
if (src->tbnameCond.cond != NULL) {
dest->tbnameCond.cond = strdup(src->tbnameCond.cond);
}
SStringCopy(&dest->tbnameCond.cond, &src->tbnameCond.cond);
dest->tbnameCond.uid = src->tbnameCond.uid; dest->tbnameCond.uid = src->tbnameCond.uid;
memcpy(&dest->joinInfo, &src->joinInfo, sizeof(SJoinInfo)); memcpy(&dest->joinInfo, &src->joinInfo, sizeof(SJoinInfo));
for (int32_t i = 0; i < src->numOfTagCond; ++i) { for (int32_t i = 0; i < src->numOfTagCond; ++i) {
SStringCopy(&dest->cond[i].cond, &src->cond[i].cond); if (src->cond[i].cond != NULL) {
dest->cond[i].cond = strdup(src->cond[i].cond);
}
dest->cond[i].uid = src->cond[i].uid; dest->cond[i].uid = src->cond[i].uid;
} }
@ -1356,10 +1373,9 @@ void tscTagCondCopy(STagCond* dest, const STagCond* src) {
} }
void tscTagCondRelease(STagCond* pCond) { void tscTagCondRelease(STagCond* pCond) {
SStringFree(&pCond->tbnameCond.cond); free(pCond->tbnameCond.cond);
for (int32_t i = 0; i < pCond->numOfTagCond; ++i) { for (int32_t i = 0; i < pCond->numOfTagCond; ++i) {
SStringFree(&pCond->cond[i].cond); free(pCond->cond[i].cond);
} }
memset(pCond, 0, sizeof(STagCond)); memset(pCond, 0, sizeof(STagCond));
@ -1571,123 +1587,6 @@ void tscResetForNextRetrieve(SSqlRes* pRes) {
pRes->numOfRows = 0; pRes->numOfRows = 0;
} }
SString SStringCreate(const char* str) {
size_t len = strlen(str);
SString dest = {.n = len, .alloc = len + 1};
dest.z = calloc(1, dest.alloc);
strcpy(dest.z, str);
return dest;
}
void SStringCopy(SString* pDest, const SString* pSrc) {
if (pSrc->n > 0) {
pDest->n = pSrc->n;
pDest->alloc = pDest->n + 1; // one additional space for null terminate
pDest->z = calloc(1, pDest->alloc);
memcpy(pDest->z, pSrc->z, pDest->n);
} else {
memset(pDest, 0, sizeof(SString));
}
}
void SStringFree(SString* pStr) {
if (pStr->alloc > 0) {
tfree(pStr->z);
pStr->alloc = 0;
}
}
void SStringShrink(SString* pStr) {
if (pStr->alloc > (pStr->n + 1) && pStr->alloc > (pStr->n * 2)) {
pStr->z = realloc(pStr->z, pStr->n + 1);
assert(pStr->z != NULL);
pStr->alloc = pStr->n + 1;
}
}
int32_t SStringAlloc(SString* pStr, int32_t size) {
if (pStr->alloc >= size) {
return TSDB_CODE_SUCCESS;
}
size = ALIGN8(size);
char* tmp = NULL;
if (pStr->z != NULL) {
tmp = realloc(pStr->z, size);
memset(pStr->z + pStr->n, 0, size - pStr->n);
} else {
tmp = calloc(1, size);
}
if (tmp == NULL) {
#ifdef WINDOWS
LPVOID lpMsgBuf;
FormatMessage(FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS, NULL,
GetLastError(), MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), // Default language
(LPTSTR)&lpMsgBuf, 0, NULL);
tscTrace("failed to allocate memory, reason:%s", lpMsgBuf);
LocalFree(lpMsgBuf);
#else
char errmsg[256] = {0};
strerror_r(errno, errmsg, tListLen(errmsg));
tscTrace("failed to allocate memory, reason:%s", errmsg);
#endif
return TSDB_CODE_CLI_OUT_OF_MEMORY;
}
pStr->z = tmp;
pStr->alloc = size;
return TSDB_CODE_SUCCESS;
}
#define MIN_ALLOC_SIZE 8
int32_t SStringEnsureRemain(SString* pStr, int32_t size) {
if (pStr->alloc - pStr->n > size) {
return TSDB_CODE_SUCCESS;
}
// remain space is insufficient, allocate more spaces
int32_t inc = (size >= MIN_ALLOC_SIZE) ? size : MIN_ALLOC_SIZE;
if (inc < (pStr->alloc >> 1)) {
inc = (pStr->alloc >> 1);
}
// get the new size
int32_t newsize = pStr->alloc + inc;
char* tmp = realloc(pStr->z, newsize);
if (tmp == NULL) {
#ifdef WINDOWS
LPVOID lpMsgBuf;
FormatMessage(FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS, NULL,
GetLastError(), MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), // Default language
(LPTSTR)&lpMsgBuf, 0, NULL);
tscTrace("failed to allocate memory, reason:%s", lpMsgBuf);
LocalFree(lpMsgBuf);
#else
char errmsg[256] = {0};
strerror_r(errno, errmsg, tListLen(errmsg));
tscTrace("failed to allocate memory, reason:%s", errmsg);
#endif
return TSDB_CODE_CLI_OUT_OF_MEMORY;
}
memset(tmp + pStr->n, 0, inc);
pStr->alloc = newsize;
pStr->z = tmp;
return TSDB_CODE_SUCCESS;
}
SSqlObj* createSubqueryObj(SSqlObj* pSql, int32_t vnodeIndex, int16_t tableIndex, void (*fp)(), void* param, SSqlObj* createSubqueryObj(SSqlObj* pSql, int32_t vnodeIndex, int16_t tableIndex, void (*fp)(), void* param,
SSqlObj* pPrevSql) { SSqlObj* pPrevSql) {
SSqlCmd* pCmd = &pSql->cmd; SSqlCmd* pCmd = &pSql->cmd;
@ -1822,9 +1721,7 @@ void tscDoQuery(SSqlObj* pSql) {
} }
} }
int16_t tscGetJoinTagColIndexByUid(SSqlCmd* pCmd, uint64_t uid) { int16_t tscGetJoinTagColIndexByUid(STagCond* pTagCond, uint64_t uid) {
STagCond* pTagCond = &pCmd->tagCond;
if (pTagCond->joinInfo.left.uid == uid) { if (pTagCond->joinInfo.left.uid == uid) {
return pTagCond->joinInfo.left.tagCol; return pTagCond->joinInfo.left.tagCol;
} else { } else {

View File

@ -128,7 +128,7 @@ extern "C" {
#define TSDB_CODE_CACHE_BLOCK_TS_DISORDERED 107 // time stamp in cache block is disordered #define TSDB_CODE_CACHE_BLOCK_TS_DISORDERED 107 // time stamp in cache block is disordered
#define TSDB_CODE_FILE_BLOCK_TS_DISORDERED 108 // time stamp in file block is disordered #define TSDB_CODE_FILE_BLOCK_TS_DISORDERED 108 // time stamp in file block is disordered
#define TSDB_CODE_INVALID_COMMIT_LOG 109 // commit log init failed #define TSDB_CODE_INVALID_COMMIT_LOG 109 // commit log init failed
#define TSDB_CODE_SERVER_NO_SPACE 110 #define TSDB_CODE_SERV_NO_DISKSPACE 110
#define TSDB_CODE_NOT_SUPER_TABLE 111 // operation only available for super table #define TSDB_CODE_NOT_SUPER_TABLE 111 // operation only available for super table
#define TSDB_CODE_DUPLICATE_TAGS 112 // tags value for join not unique #define TSDB_CODE_DUPLICATE_TAGS 112 // tags value for join not unique
#define TSDB_CODE_INVALID_SUBMIT_MSG 113 #define TSDB_CODE_INVALID_SUBMIT_MSG 113
@ -137,6 +137,8 @@ extern "C" {
#define TSDB_CODE_INVALID_VNODE_STATUS 116 #define TSDB_CODE_INVALID_VNODE_STATUS 116
#define TSDB_CODE_FAILED_TO_LOCK_RESOURCES 117 #define TSDB_CODE_FAILED_TO_LOCK_RESOURCES 117
#define TSDB_CODE_MAX_ERROR_CODE 118
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -106,7 +106,6 @@ extern int tsMaxDbs;
extern int tsMaxTables; extern int tsMaxTables;
extern int tsMaxDnodes; extern int tsMaxDnodes;
extern int tsMaxVGroups; extern int tsMaxVGroups;
extern int tsShellActivityTimer;
extern char tsMgmtZone[]; extern char tsMgmtZone[];
extern char tsLocalIp[]; extern char tsLocalIp[];
@ -127,6 +126,7 @@ extern int tsEnableHttpModule;
extern int tsEnableMonitorModule; extern int tsEnableMonitorModule;
extern int tsRestRowLimit; extern int tsRestRowLimit;
extern int tsCompressMsgSize; extern int tsCompressMsgSize;
extern int tsMaxSQLStringLen;
extern char tsSocketType[4]; extern char tsSocketType[4];

View File

@ -100,6 +100,7 @@ extern "C" {
#define TSDB_COL_NAME_LEN 64 #define TSDB_COL_NAME_LEN 64
#define TSDB_MAX_SAVED_SQL_LEN TSDB_MAX_COLUMNS * 16 #define TSDB_MAX_SAVED_SQL_LEN TSDB_MAX_COLUMNS * 16
#define TSDB_MAX_SQL_LEN TSDB_PAYLOAD_SIZE #define TSDB_MAX_SQL_LEN TSDB_PAYLOAD_SIZE
#define TSDB_MAX_ALLOWED_SQL_LEN (8*1024*1024U) // sql length should be less than 6mb
#define TSDB_MAX_BYTES_PER_ROW TSDB_MAX_COLUMNS * 16 #define TSDB_MAX_BYTES_PER_ROW TSDB_MAX_COLUMNS * 16
#define TSDB_MAX_TAGS_LEN 512 #define TSDB_MAX_TAGS_LEN 512

View File

@ -238,7 +238,7 @@ char *tsError[] = {"success",
"only super table has metric meta info", "only super table has metric meta info",
"tags value not unique for join", "tags value not unique for join",
"invalid submit message", "invalid submit message",
"not active table(not created yet or deleted already)", //114 "not active table(not created yet or dropped already)", //114
"invalid table id", "invalid table id",
"invalid vnode status", //116 "invalid vnode status", //116
"failed to lock resources", "failed to lock resources",

View File

@ -27,7 +27,13 @@ extern "C" {
#define GET_QINFO_ADDR(x) ((char*)(x)-offsetof(SQInfo, query)) #define GET_QINFO_ADDR(x) ((char*)(x)-offsetof(SQInfo, query))
#define Q_STATUS_EQUAL(p, s) (((p) & (s)) != 0) #define Q_STATUS_EQUAL(p, s) (((p) & (s)) != 0)
/*
* set the output buffer page size is 16k
* The page size should be sufficient for at least one output result or intermediate result.
* Some intermediate results may be extremely large, such as top/bottom(100) query.
*/
#define DEFAULT_INTERN_BUF_SIZE 16384L #define DEFAULT_INTERN_BUF_SIZE 16384L
#define INIT_ALLOCATE_DISK_PAGES 60L #define INIT_ALLOCATE_DISK_PAGES 60L
#define DEFAULT_DATA_FILE_MAPPING_PAGES 2L #define DEFAULT_DATA_FILE_MAPPING_PAGES 2L
#define DEFAULT_DATA_FILE_MMAP_WINDOW_SIZE (DEFAULT_DATA_FILE_MAPPING_PAGES * DEFAULT_INTERN_BUF_SIZE) #define DEFAULT_DATA_FILE_MMAP_WINDOW_SIZE (DEFAULT_DATA_FILE_MAPPING_PAGES * DEFAULT_INTERN_BUF_SIZE)
@ -160,7 +166,7 @@ void pointInterpSupporterDestroy(SPointInterpoSupporter* pPointInterpSupport);
void pointInterpSupporterSetData(SQInfo* pQInfo, SPointInterpoSupporter* pPointInterpSupport); void pointInterpSupporterSetData(SQInfo* pQInfo, SPointInterpoSupporter* pPointInterpSupport);
int64_t loadRequiredBlockIntoMem(SQueryRuntimeEnv* pRuntimeEnv, SPositionInfo* position); int64_t loadRequiredBlockIntoMem(SQueryRuntimeEnv* pRuntimeEnv, SPositionInfo* position);
void doCloseAllOpenedResults(SMeterQuerySupportObj* pSupporter); int32_t doCloseAllOpenedResults(SMeterQuerySupportObj* pSupporter);
void disableFunctForSuppleScan(SQueryRuntimeEnv* pRuntimeEnv, int32_t order); void disableFunctForSuppleScan(SQueryRuntimeEnv* pRuntimeEnv, int32_t order);
void enableFunctForMasterScan(SQueryRuntimeEnv* pRuntimeEnv, int32_t order); void enableFunctForMasterScan(SQueryRuntimeEnv* pRuntimeEnv, int32_t order);
@ -185,7 +191,7 @@ void freeMeterBlockInfoEx(SMeterDataBlockInfoEx* pDataBlockInfoEx, int32_t len);
void setExecutionContext(SMeterQuerySupportObj* pSupporter, SOutputRes* outputRes, int32_t meterIdx, int32_t groupIdx, void setExecutionContext(SMeterQuerySupportObj* pSupporter, SOutputRes* outputRes, int32_t meterIdx, int32_t groupIdx,
SMeterQueryInfo* sqinfo); SMeterQueryInfo* sqinfo);
void setIntervalQueryExecutionContext(SMeterQuerySupportObj* pSupporter, int32_t meterIdx, SMeterQueryInfo* sqinfo); int32_t setIntervalQueryExecutionContext(SMeterQuerySupportObj* pSupporter, int32_t meterIdx, SMeterQueryInfo* sqinfo);
int64_t getQueryStartPositionInCache(SQueryRuntimeEnv* pRuntimeEnv, int32_t* slot, int32_t* pos, bool ignoreQueryRange); int64_t getQueryStartPositionInCache(SQueryRuntimeEnv* pRuntimeEnv, int32_t* slot, int32_t* pos, bool ignoreQueryRange);
int64_t getNextAccessedKeyInData(SQuery* pQuery, int64_t* pPrimaryCol, SBlockInfo* pBlockInfo, int32_t blockStatus); int64_t getNextAccessedKeyInData(SQuery* pQuery, int64_t* pPrimaryCol, SBlockInfo* pBlockInfo, int32_t blockStatus);
@ -224,11 +230,11 @@ void changeMeterQueryInfoForSuppleQuery(SMeterQueryInfo *pMeterQueryInfo, TSKEY
/** /**
* add the new allocated disk page to meter query info * add the new allocated disk page to meter query info
* the new allocated disk page is used to keep the intermediate (interval) results * the new allocated disk page is used to keep the intermediate (interval) results
* * @param pQuery
* @param pMeterQueryInfo * @param pMeterQueryInfo
* @param pSupporter * @param pSupporter
*/ */
tFilePage* addDataPageForMeterQueryInfo(SMeterQueryInfo *pMeterQueryInfo, SMeterQuerySupportObj *pSupporter); tFilePage* addDataPageForMeterQueryInfo(SQuery* pQuery, SMeterQueryInfo *pMeterQueryInfo, SMeterQuerySupportObj *pSupporter);
/** /**
* save the query range data into SMeterQueryInfo * save the query range data into SMeterQueryInfo

View File

@ -67,13 +67,13 @@ static void doApplyIntervalQueryOnBlock(SMeterQuerySupportObj *pSupporter, SMete
SBlockInfo *pBlockInfo, int64_t *pPrimaryCol, char *sdata, SField *pFields, SBlockInfo *pBlockInfo, int64_t *pPrimaryCol, char *sdata, SField *pFields,
__block_search_fn_t searchFn); __block_search_fn_t searchFn);
static void saveResult(SMeterQuerySupportObj *pSupporter, SMeterQueryInfo *pMeterQueryInfo, int32_t numOfResult); static int32_t saveResult(SMeterQuerySupportObj *pSupporter, SMeterQueryInfo *pMeterQueryInfo, int32_t numOfResult);
static void applyIntervalQueryOnBlock(SMeterQuerySupportObj *pSupporter, SMeterDataInfo *pInfoEx, char *data, static void applyIntervalQueryOnBlock(SMeterQuerySupportObj *pSupporter, SMeterDataInfo *pInfoEx, char *data,
int64_t *pPrimaryData, SBlockInfo *pBlockInfo, int32_t blockStatus, int64_t *pPrimaryData, SBlockInfo *pBlockInfo, int32_t blockStatus,
SField *pFields, __block_search_fn_t searchFn); SField *pFields, __block_search_fn_t searchFn);
static void resetMergeResultBuf(SQuery *pQuery, SQLFunctionCtx *pCtx); static void resetMergeResultBuf(SQuery *pQuery, SQLFunctionCtx *pCtx);
static void flushFromResultBuf(SMeterQuerySupportObj *pSupporter, const SQuery *pQuery, static int32_t flushFromResultBuf(SMeterQuerySupportObj *pSupporter, const SQuery *pQuery,
const SQueryRuntimeEnv *pRuntimeEnv); const SQueryRuntimeEnv *pRuntimeEnv);
static void validateTimestampForSupplementResult(SQueryRuntimeEnv *pRuntimeEnv, int64_t numOfIncrementRes); static void validateTimestampForSupplementResult(SQueryRuntimeEnv *pRuntimeEnv, int64_t numOfIncrementRes);
static void getBasicCacheInfoSnapshot(SQuery *pQuery, SCacheInfo *pCacheInfo, int32_t vid); static void getBasicCacheInfoSnapshot(SQuery *pQuery, SCacheInfo *pCacheInfo, int32_t vid);
@ -413,7 +413,7 @@ char *vnodeGetHeaderFileData(SQueryRuntimeEnv *pRuntimeEnv, int32_t vnodeId, int
vnodeSetOpenedFileNames(pVnodeFileInfo); vnodeSetOpenedFileNames(pVnodeFileInfo);
if (doOpenQueryFileData(pQInfo, pVnodeFileInfo, vnodeId) != TSDB_CODE_SUCCESS) { if (doOpenQueryFileData(pQInfo, pVnodeFileInfo, vnodeId) != TSDB_CODE_SUCCESS) {
doCloseOpenedFileData(pVnodeFileInfo); // there may be partially open fd, close it anyway. doCloseOpenedFileData(pVnodeFileInfo); // all the fds may be partially opened, close them anyway.
return pVnodeFileInfo->pHeaderFileData; return pVnodeFileInfo->pHeaderFileData;
} }
} }
@ -1291,9 +1291,6 @@ static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t
for (int32_t k = 0; k < pQuery->numOfOutputCols; ++k) { for (int32_t k = 0; k < pQuery->numOfOutputCols; ++k) {
int32_t functionId = pQuery->pSelectExpr[k].pBase.functionId; int32_t functionId = pQuery->pSelectExpr[k].pBase.functionId;
// if (!functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) {
// continue;
// }
SField dummyField = {0}; SField dummyField = {0};
@ -3052,7 +3049,7 @@ static void vnodeRecordAllFiles(SQInfo *pQInfo, int32_t vnodeId) {
sprintf(pVnodeFilesInfo->dbFilePathPrefix, "%s/vnode%d/db/", tsDirectory, vnodeId); sprintf(pVnodeFilesInfo->dbFilePathPrefix, "%s/vnode%d/db/", tsDirectory, vnodeId);
DIR *pDir = opendir(pVnodeFilesInfo->dbFilePathPrefix); DIR *pDir = opendir(pVnodeFilesInfo->dbFilePathPrefix);
if (pDir == NULL) { if (pDir == NULL) {
dError("QInfo:%p failed to open directory:%s", pQInfo, pVnodeFilesInfo->dbFilePathPrefix); dError("QInfo:%p failed to open directory:%s, %s", pQInfo, pVnodeFilesInfo->dbFilePathPrefix, strerror(errno));
return; return;
} }
@ -3920,11 +3917,16 @@ int32_t vnodeMultiMeterQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param)
dError("QInfo:%p failed to create file: %s on disk. %s", pQInfo, pSupporter->extBufFile, strerror(errno)); dError("QInfo:%p failed to create file: %s on disk. %s", pQInfo, pSupporter->extBufFile, strerror(errno));
return TSDB_CODE_SERV_OUT_OF_MEMORY; return TSDB_CODE_SERV_OUT_OF_MEMORY;
} }
// set 4k page for each meter
pSupporter->numOfPages = pSupporter->numOfMeters; pSupporter->numOfPages = pSupporter->numOfMeters;
ftruncate(pSupporter->meterOutputFd, pSupporter->numOfPages * DEFAULT_INTERN_BUF_SIZE); ret = ftruncate(pSupporter->meterOutputFd, pSupporter->numOfPages * DEFAULT_INTERN_BUF_SIZE);
if (ret != TSDB_CODE_SUCCESS) {
dError("QInfo:%p failed to create intermediate result output file:%s. %s", pQInfo, pSupporter->extBufFile,
strerror(errno));
return TSDB_CODE_SERV_NO_DISKSPACE;
}
pSupporter->runtimeEnv.numOfRowsPerPage = (DEFAULT_INTERN_BUF_SIZE - sizeof(tFilePage)) / pQuery->rowSize; pSupporter->runtimeEnv.numOfRowsPerPage = (DEFAULT_INTERN_BUF_SIZE - sizeof(tFilePage)) / pQuery->rowSize;
pSupporter->lastPageId = -1; pSupporter->lastPageId = -1;
pSupporter->bufSize = pSupporter->numOfPages * DEFAULT_INTERN_BUF_SIZE; pSupporter->bufSize = pSupporter->numOfPages * DEFAULT_INTERN_BUF_SIZE;
@ -3932,7 +3934,7 @@ int32_t vnodeMultiMeterQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param)
pSupporter->meterOutputMMapBuf = pSupporter->meterOutputMMapBuf =
mmap(NULL, pSupporter->bufSize, PROT_READ | PROT_WRITE, MAP_SHARED, pSupporter->meterOutputFd, 0); mmap(NULL, pSupporter->bufSize, PROT_READ | PROT_WRITE, MAP_SHARED, pSupporter->meterOutputFd, 0);
if (pSupporter->meterOutputMMapBuf == MAP_FAILED) { if (pSupporter->meterOutputMMapBuf == MAP_FAILED) {
dError("QInfo:%p failed to map data file: %s to disk. %s", pQInfo, pSupporter->extBufFile, strerror(errno)); dError("QInfo:%p failed to map temp file: %s. %s", pQInfo, pSupporter->extBufFile, strerror(errno));
return TSDB_CODE_SERV_OUT_OF_MEMORY; return TSDB_CODE_SERV_OUT_OF_MEMORY;
} }
} }
@ -4733,20 +4735,24 @@ int32_t mergeMetersResultToOneGroups(SMeterQuerySupportObj *pSupporter) {
SQuery * pQuery = pRuntimeEnv->pQuery; SQuery * pQuery = pRuntimeEnv->pQuery;
int64_t st = taosGetTimestampMs(); int64_t st = taosGetTimestampMs();
int32_t ret = TSDB_CODE_SUCCESS;
while (pSupporter->subgroupIdx < pSupporter->pSidSet->numOfSubSet) { while (pSupporter->subgroupIdx < pSupporter->pSidSet->numOfSubSet) {
int32_t start = pSupporter->pSidSet->starterPos[pSupporter->subgroupIdx]; int32_t start = pSupporter->pSidSet->starterPos[pSupporter->subgroupIdx];
int32_t end = pSupporter->pSidSet->starterPos[pSupporter->subgroupIdx + 1]; int32_t end = pSupporter->pSidSet->starterPos[pSupporter->subgroupIdx + 1];
int32_t ret = ret = doMergeMetersResultsToGroupRes(pSupporter, pQuery, pRuntimeEnv, pSupporter->pMeterDataInfo, start, end);
doMergeMetersResultsToGroupRes(pSupporter, pQuery, pRuntimeEnv, pSupporter->pMeterDataInfo, start, end); if (ret < 0) { // not enough disk space to save the data into disk
return -1;
}
pSupporter->subgroupIdx += 1; pSupporter->subgroupIdx += 1;
/* this group generates at least one result, return results */ // this group generates at least one result, return results
if (ret > 0) { if (ret > 0) {
break; break;
} }
assert(pSupporter->numOfGroupResultPages == 0); assert(pSupporter->numOfGroupResultPages == 0);
dTrace("QInfo:%p no result in group %d, continue", GET_QINFO_ADDR(pQuery), pSupporter->subgroupIdx - 1); dTrace("QInfo:%p no result in group %d, continue", GET_QINFO_ADDR(pQuery), pSupporter->subgroupIdx - 1);
} }
@ -4754,7 +4760,7 @@ int32_t mergeMetersResultToOneGroups(SMeterQuerySupportObj *pSupporter) {
dTrace("QInfo:%p merge res data into group, index:%d, total group:%d, elapsed time:%lldms", GET_QINFO_ADDR(pQuery), dTrace("QInfo:%p merge res data into group, index:%d, total group:%d, elapsed time:%lldms", GET_QINFO_ADDR(pQuery),
pSupporter->subgroupIdx - 1, pSupporter->pSidSet->numOfSubSet, taosGetTimestampMs() - st); pSupporter->subgroupIdx - 1, pSupporter->pSidSet->numOfSubSet, taosGetTimestampMs() - st);
return pSupporter->numOfGroupResultPages; return TSDB_CODE_SUCCESS;
} }
void copyResToQueryResultBuf(SMeterQuerySupportObj *pSupporter, SQuery *pQuery) { void copyResToQueryResultBuf(SMeterQuerySupportObj *pSupporter, SQuery *pQuery) {
@ -4762,7 +4768,9 @@ void copyResToQueryResultBuf(SMeterQuerySupportObj *pSupporter, SQuery *pQuery)
pSupporter->numOfGroupResultPages = 0; pSupporter->numOfGroupResultPages = 0;
// current results of group has been sent to client, try next group // current results of group has been sent to client, try next group
mergeMetersResultToOneGroups(pSupporter); if (mergeMetersResultToOneGroups(pSupporter) != TSDB_CODE_SUCCESS) {
return; // failed to save data in the disk
}
// set current query completed // set current query completed
if (pSupporter->numOfGroupResultPages == 0 && pSupporter->subgroupIdx == pSupporter->pSidSet->numOfSubSet) { if (pSupporter->numOfGroupResultPages == 0 && pSupporter->subgroupIdx == pSupporter->pSidSet->numOfSubSet) {
@ -4840,7 +4848,10 @@ int32_t doMergeMetersResultsToGroupRes(SMeterQuerySupportObj *pSupporter, SQuery
} else { } else {
// copy data to disk buffer // copy data to disk buffer
if (buffer[0]->numOfElems == pQuery->pointsToRead) { if (buffer[0]->numOfElems == pQuery->pointsToRead) {
flushFromResultBuf(pSupporter, pQuery, pRuntimeEnv); if (flushFromResultBuf(pSupporter, pQuery, pRuntimeEnv) != TSDB_CODE_SUCCESS) {
return -1;
}
resetMergeResultBuf(pQuery, pCtx); resetMergeResultBuf(pQuery, pCtx);
} }
@ -4887,7 +4898,14 @@ int32_t doMergeMetersResultsToGroupRes(SMeterQuerySupportObj *pSupporter, SQuery
} }
if (buffer[0]->numOfElems != 0) { // there are data in buffer if (buffer[0]->numOfElems != 0) { // there are data in buffer
flushFromResultBuf(pSupporter, pQuery, pRuntimeEnv); if (flushFromResultBuf(pSupporter, pQuery, pRuntimeEnv) != TSDB_CODE_SUCCESS) {
dError("QInfo:%p failed to flush data into temp file, abort query", GET_QINFO_ADDR(pQuery), pSupporter->extBufFile);
tfree(pTree);
tfree(pValidMeter);
tfree(posArray);
return -1;
}
} }
int64_t endt = taosGetTimestampMs(); int64_t endt = taosGetTimestampMs();
@ -4906,25 +4924,44 @@ int32_t doMergeMetersResultsToGroupRes(SMeterQuerySupportObj *pSupporter, SQuery
return pSupporter->numOfGroupResultPages; return pSupporter->numOfGroupResultPages;
} }
static void extendDiskBuf(SMeterQuerySupportObj *pSupporter, int32_t numOfPages) { static int32_t extendDiskBuf(const SQuery* pQuery, SMeterQuerySupportObj *pSupporter, int32_t numOfPages) {
assert(pSupporter->numOfPages * DEFAULT_INTERN_BUF_SIZE == pSupporter->bufSize); assert(pSupporter->numOfPages * DEFAULT_INTERN_BUF_SIZE == pSupporter->bufSize);
SQInfo* pQInfo = (SQInfo*) GET_QINFO_ADDR(pQuery);
int32_t ret = munmap(pSupporter->meterOutputMMapBuf, pSupporter->bufSize); int32_t ret = munmap(pSupporter->meterOutputMMapBuf, pSupporter->bufSize);
pSupporter->numOfPages = numOfPages; pSupporter->numOfPages = numOfPages;
// disk-based output buffer is exhausted, try to extend the disk-based buffer /*
* disk-based output buffer is exhausted, try to extend the disk-based buffer, the available disk space may
* be insufficient
*/
ret = ftruncate(pSupporter->meterOutputFd, pSupporter->numOfPages * DEFAULT_INTERN_BUF_SIZE); ret = ftruncate(pSupporter->meterOutputFd, pSupporter->numOfPages * DEFAULT_INTERN_BUF_SIZE);
if (ret != 0) { if (ret != 0) {
perror("error in allocate the disk-based buffer"); dError("QInfo:%p failed to create intermediate result output file:%s. %s", pQInfo, pSupporter->extBufFile,
return; strerror(errno));
pQInfo->code = -TSDB_CODE_SERV_NO_DISKSPACE;
pQInfo->killed = 1;
return pQInfo->code;
} }
pSupporter->bufSize = pSupporter->numOfPages * DEFAULT_INTERN_BUF_SIZE; pSupporter->bufSize = pSupporter->numOfPages * DEFAULT_INTERN_BUF_SIZE;
pSupporter->meterOutputMMapBuf = pSupporter->meterOutputMMapBuf =
mmap(NULL, pSupporter->bufSize, PROT_READ | PROT_WRITE, MAP_SHARED, pSupporter->meterOutputFd, 0); mmap(NULL, pSupporter->bufSize, PROT_READ | PROT_WRITE, MAP_SHARED, pSupporter->meterOutputFd, 0);
if (pSupporter->meterOutputMMapBuf == MAP_FAILED) {
dError("QInfo:%p failed to map temp file: %s. %s", pQInfo, pSupporter->extBufFile, strerror(errno));
pQInfo->code = -TSDB_CODE_SERV_OUT_OF_MEMORY;
pQInfo->killed = 1;
return pQInfo->code;
}
return TSDB_CODE_SUCCESS;
} }
void flushFromResultBuf(SMeterQuerySupportObj *pSupporter, const SQuery *pQuery, const SQueryRuntimeEnv *pRuntimeEnv) { int32_t flushFromResultBuf(SMeterQuerySupportObj *pSupporter, const SQuery *pQuery, const SQueryRuntimeEnv *pRuntimeEnv) {
int32_t numOfMeterResultBufPages = pSupporter->lastPageId + 1; int32_t numOfMeterResultBufPages = pSupporter->lastPageId + 1;
int64_t dstSize = numOfMeterResultBufPages * DEFAULT_INTERN_BUF_SIZE + int64_t dstSize = numOfMeterResultBufPages * DEFAULT_INTERN_BUF_SIZE +
pSupporter->groupResultSize * (pSupporter->numOfGroupResultPages + 1); pSupporter->groupResultSize * (pSupporter->numOfGroupResultPages + 1);
@ -4935,7 +4972,9 @@ void flushFromResultBuf(SMeterQuerySupportObj *pSupporter, const SQuery *pQuery,
requiredPages += pSupporter->numOfMeters; requiredPages += pSupporter->numOfMeters;
} }
extendDiskBuf(pSupporter, requiredPages); if (extendDiskBuf(pQuery, pSupporter, requiredPages) != TSDB_CODE_SUCCESS) {
return -1;
}
} }
char *lastPosition = pSupporter->meterOutputMMapBuf + DEFAULT_INTERN_BUF_SIZE * numOfMeterResultBufPages + char *lastPosition = pSupporter->meterOutputMMapBuf + DEFAULT_INTERN_BUF_SIZE * numOfMeterResultBufPages +
@ -4949,6 +4988,7 @@ void flushFromResultBuf(SMeterQuerySupportObj *pSupporter, const SQuery *pQuery,
} }
pSupporter->numOfGroupResultPages += 1; pSupporter->numOfGroupResultPages += 1;
return TSDB_CODE_SUCCESS;
} }
void resetMergeResultBuf(SQuery *pQuery, SQLFunctionCtx *pCtx) { void resetMergeResultBuf(SQuery *pQuery, SQLFunctionCtx *pCtx) {
@ -4966,7 +5006,7 @@ void setMeterDataInfo(SMeterDataInfo *pMeterDataInfo, SMeterObj *pMeterObj, int3
pMeterDataInfo->meterOrderIdx = meterIdx; pMeterDataInfo->meterOrderIdx = meterIdx;
} }
void doCloseAllOpenedResults(SMeterQuerySupportObj *pSupporter) { int32_t doCloseAllOpenedResults(SMeterQuerySupportObj *pSupporter) {
SQueryRuntimeEnv *pRuntimeEnv = &pSupporter->runtimeEnv; SQueryRuntimeEnv *pRuntimeEnv = &pSupporter->runtimeEnv;
SQuery * pQuery = pRuntimeEnv->pQuery; SQuery * pQuery = pRuntimeEnv->pQuery;
@ -4980,11 +5020,20 @@ void doCloseAllOpenedResults(SMeterQuerySupportObj *pSupporter) {
pRuntimeEnv->pMeterObj = getMeterObj(pSupporter->pMeterObj, pSupporter->pSidSet->pSids[index]->sid); pRuntimeEnv->pMeterObj = getMeterObj(pSupporter->pMeterObj, pSupporter->pSidSet->pSids[index]->sid);
assert(pRuntimeEnv->pMeterObj == pMeterInfo[i].pMeterObj); assert(pRuntimeEnv->pMeterObj == pMeterInfo[i].pMeterObj);
setIntervalQueryExecutionContext(pSupporter, i, pMeterInfo[i].pMeterQInfo); int32_t ret = setIntervalQueryExecutionContext(pSupporter, i, pMeterInfo[i].pMeterQInfo);
saveResult(pSupporter, pMeterInfo[i].pMeterQInfo, pMeterInfo[i].pMeterQInfo->lastResRows); if (ret != TSDB_CODE_SUCCESS) {
return ret;
}
ret = saveResult(pSupporter, pMeterInfo[i].pMeterQInfo, pMeterInfo[i].pMeterQInfo->lastResRows);
if (ret != TSDB_CODE_SUCCESS) {
return ret;
}
} }
} }
} }
return TSDB_CODE_SUCCESS;
} }
void disableFunctForSuppleScan(SQueryRuntimeEnv *pRuntimeEnv, int32_t order) { void disableFunctForSuppleScan(SQueryRuntimeEnv *pRuntimeEnv, int32_t order) {
@ -5690,18 +5739,24 @@ void changeMeterQueryInfoForSuppleQuery(SMeterQueryInfo *pMeterQueryInfo, TSKEY
} }
} }
static tFilePage *allocNewPage(SMeterQuerySupportObj *pSupporter, uint32_t *pageId) { static tFilePage *allocNewPage(SQuery* pQuery, SMeterQuerySupportObj *pSupporter, uint32_t *pageId) {
if (pSupporter->lastPageId == pSupporter->numOfPages - 1) { if (pSupporter->lastPageId == pSupporter->numOfPages - 1) {
extendDiskBuf(pSupporter, pSupporter->numOfPages + pSupporter->numOfMeters); if (extendDiskBuf(pQuery, pSupporter, pSupporter->numOfPages + pSupporter->numOfMeters) != TSDB_CODE_SUCCESS) {
return NULL;
}
} }
*pageId = (++pSupporter->lastPageId); *pageId = (++pSupporter->lastPageId);
return getFilePage(pSupporter, *pageId); return getFilePage(pSupporter, *pageId);
} }
tFilePage *addDataPageForMeterQueryInfo(SMeterQueryInfo *pMeterQueryInfo, SMeterQuerySupportObj *pSupporter) { tFilePage *addDataPageForMeterQueryInfo(SQuery* pQuery, SMeterQueryInfo *pMeterQueryInfo, SMeterQuerySupportObj *pSupporter) {
uint32_t pageId = 0; uint32_t pageId = 0;
tFilePage *pPage = allocNewPage(pSupporter, &pageId);
tFilePage *pPage = allocNewPage(pQuery, pSupporter, &pageId);
if (pPage == NULL) { // failed to allocate disk-based buffer for intermediate results
return NULL;
}
if (pMeterQueryInfo->numOfPages >= pMeterQueryInfo->numOfAlloc) { if (pMeterQueryInfo->numOfPages >= pMeterQueryInfo->numOfAlloc) {
pMeterQueryInfo->numOfAlloc = pMeterQueryInfo->numOfAlloc << 1; pMeterQueryInfo->numOfAlloc = pMeterQueryInfo->numOfAlloc << 1;
@ -6199,46 +6254,53 @@ void validateTimestampForSupplementResult(SQueryRuntimeEnv *pRuntimeEnv, int64_t
} }
} }
void setOutputBufferForIntervalQuery(SMeterQuerySupportObj *pSupporter, SMeterQueryInfo *pMeterQueryInfo) { int32_t setOutputBufferForIntervalQuery(SMeterQuerySupportObj *pSupporter, SMeterQueryInfo *pMeterQueryInfo) {
SQueryRuntimeEnv *pRuntimeEnv = &pSupporter->runtimeEnv; SQueryRuntimeEnv *pRuntimeEnv = &pSupporter->runtimeEnv;
tFilePage * pData = NULL; tFilePage * pData = NULL;
SQuery* pQuery = pRuntimeEnv->pQuery;
// in the first scan, new space needed for results // in the first scan, new space needed for results
if (pMeterQueryInfo->numOfPages == 0) { if (pMeterQueryInfo->numOfPages == 0) {
pData = addDataPageForMeterQueryInfo(pMeterQueryInfo, pSupporter); pData = addDataPageForMeterQueryInfo(pQuery, pMeterQueryInfo, pSupporter);
} else { } else {
int32_t lastPageId = pMeterQueryInfo->pageList[pMeterQueryInfo->numOfPages - 1]; int32_t lastPageId = pMeterQueryInfo->pageList[pMeterQueryInfo->numOfPages - 1];
pData = getFilePage(pSupporter, lastPageId); pData = getFilePage(pSupporter, lastPageId);
if (pData->numOfElems >= pRuntimeEnv->numOfRowsPerPage) { if (pData->numOfElems >= pRuntimeEnv->numOfRowsPerPage) {
pData = addDataPageForMeterQueryInfo(pMeterQueryInfo, pSupporter); pData = addDataPageForMeterQueryInfo(pRuntimeEnv->pQuery, pMeterQueryInfo, pSupporter);
assert(pData->numOfElems == 0); // number of elements must be 0 for new allocated buffer if (pData != NULL) {
assert(pData->numOfElems == 0); // number of elements must be 0 for new allocated buffer
}
} }
} }
if (pData == NULL) {
return -1;
}
for (int32_t i = 0; i < pRuntimeEnv->pQuery->numOfOutputCols; ++i) { for (int32_t i = 0; i < pRuntimeEnv->pQuery->numOfOutputCols; ++i) {
pRuntimeEnv->pCtx[i].aOutputBuf = getOutputResPos(pRuntimeEnv, pData, pData->numOfElems, i); pRuntimeEnv->pCtx[i].aOutputBuf = getOutputResPos(pRuntimeEnv, pData, pData->numOfElems, i);
pRuntimeEnv->pCtx[i].resultInfo = &pMeterQueryInfo->resultInfo[i]; pRuntimeEnv->pCtx[i].resultInfo = &pMeterQueryInfo->resultInfo[i];
} }
return TSDB_CODE_SUCCESS;
} }
void setIntervalQueryExecutionContext(SMeterQuerySupportObj *pSupporter, int32_t meterIdx, int32_t setIntervalQueryExecutionContext(SMeterQuerySupportObj *pSupporter, int32_t meterIdx,
SMeterQueryInfo *pMeterQueryInfo) { SMeterQueryInfo *pMeterQueryInfo) {
SQueryRuntimeEnv *pRuntimeEnv = &pSupporter->runtimeEnv; SQueryRuntimeEnv *pRuntimeEnv = &pSupporter->runtimeEnv;
SQuery * pQuery = pRuntimeEnv->pQuery;
if (IS_MASTER_SCAN(pRuntimeEnv)) { if (IS_MASTER_SCAN(pRuntimeEnv)) {
setOutputBufferForIntervalQuery(pSupporter, pMeterQueryInfo); if (setOutputBufferForIntervalQuery(pSupporter, pMeterQueryInfo) != TSDB_CODE_SUCCESS) {
// not enough disk space or memory buffer for intermediate results
return -1;
}
if (pMeterQueryInfo->lastResRows == 0) { if (pMeterQueryInfo->lastResRows == 0) {
initCtxOutputBuf(pRuntimeEnv); initCtxOutputBuf(pRuntimeEnv);
} }
// reset the number of iterated elements, once this function is called. since the pCtx for different
for (int32_t j = 0; j < pQuery->numOfOutputCols; ++j) {
// pRuntimeEnv->pCtx[j].numOfIteratedElems = 0;
}
} else { } else {
if (pMeterQueryInfo->reverseFillRes) { if (pMeterQueryInfo->reverseFillRes) {
setCtxOutputPointerForSupplementScan(pSupporter, pMeterQueryInfo); setCtxOutputPointerForSupplementScan(pSupporter, pMeterQueryInfo);
@ -6249,7 +6311,9 @@ void setIntervalQueryExecutionContext(SMeterQuerySupportObj *pSupporter, int32_t
* *
* If the master scan does not produce any results, new spaces needed to be allocated during supplement scan * If the master scan does not produce any results, new spaces needed to be allocated during supplement scan
*/ */
setOutputBufferForIntervalQuery(pSupporter, pMeterQueryInfo); if (setOutputBufferForIntervalQuery(pSupporter, pMeterQueryInfo) != TSDB_CODE_SUCCESS) {
return -1;
}
} }
} }
@ -6659,14 +6723,14 @@ static void validateResultBuf(SMeterQuerySupportObj *pSupporter, SMeterQueryInfo
} }
} }
void saveResult(SMeterQuerySupportObj *pSupporter, SMeterQueryInfo *pMeterQueryInfo, int32_t numOfResult) { int32_t saveResult(SMeterQuerySupportObj *pSupporter, SMeterQueryInfo *pMeterQueryInfo, int32_t numOfResult) {
SQueryRuntimeEnv *pRuntimeEnv = &pSupporter->runtimeEnv; SQueryRuntimeEnv *pRuntimeEnv = &pSupporter->runtimeEnv;
SQuery * pQuery = pRuntimeEnv->pQuery; SQuery * pQuery = pRuntimeEnv->pQuery;
// no results generated, do nothing for master scan // no results generated, do nothing for master scan
if (numOfResult <= 0) { if (numOfResult <= 0) {
if (IS_MASTER_SCAN(pRuntimeEnv)) { if (IS_MASTER_SCAN(pRuntimeEnv)) {
return; return TSDB_CODE_SUCCESS;
} else { } else {
/* /*
* There is a case that no result generated during the the supplement scan, and during the main * There is a case that no result generated during the the supplement scan, and during the main
@ -6691,7 +6755,7 @@ void saveResult(SMeterQuerySupportObj *pSupporter, SMeterQueryInfo *pMeterQueryI
setCtxOutputPointerForSupplementScan(pSupporter, pMeterQueryInfo); setCtxOutputPointerForSupplementScan(pSupporter, pMeterQueryInfo);
} }
return; return TSDB_CODE_SUCCESS;
} }
} }
@ -6720,7 +6784,9 @@ void saveResult(SMeterQuerySupportObj *pSupporter, SMeterQueryInfo *pMeterQueryI
pMeterQueryInfo->numOfRes += numOfResult; pMeterQueryInfo->numOfRes += numOfResult;
assert(pData->numOfElems <= pRuntimeEnv->numOfRowsPerPage); assert(pData->numOfElems <= pRuntimeEnv->numOfRowsPerPage);
setOutputBufferForIntervalQuery(pSupporter, pMeterQueryInfo); if (setOutputBufferForIntervalQuery(pSupporter, pMeterQueryInfo) != TSDB_CODE_SUCCESS) {
return -1;
}
for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
resetResultInfo(&pMeterQueryInfo->resultInfo[i]); resetResultInfo(&pMeterQueryInfo->resultInfo[i]);
@ -6743,6 +6809,8 @@ void saveResult(SMeterQuerySupportObj *pSupporter, SMeterQueryInfo *pMeterQueryI
tColModelDisplay(cm, outputPage->data, outputPage->numOfElems, pRuntimeEnv->numOfRowsPerPage); tColModelDisplay(cm, outputPage->data, outputPage->numOfElems, pRuntimeEnv->numOfRowsPerPage);
#endif #endif
} }
return TSDB_CODE_SUCCESS;
} }
static int32_t getSubsetNumber(SMeterQuerySupportObj *pSupporter) { static int32_t getSubsetNumber(SMeterQuerySupportObj *pSupporter) {

View File

@ -157,7 +157,11 @@ static SMeterDataInfo *queryOnMultiDataCache(SQInfo *pQInfo, SMeterDataInfo *pMe
setExecutionContext(pSupporter, pSupporter->pResult, k, pMeterInfo[k].groupIdx, pMeterQueryInfo); setExecutionContext(pSupporter, pSupporter->pResult, k, pMeterInfo[k].groupIdx, pMeterQueryInfo);
} else { } else {
setIntervalQueryExecutionContext(pSupporter, k, pMeterQueryInfo); int32_t ret = setIntervalQueryExecutionContext(pSupporter, k, pMeterQueryInfo);
if (ret != TSDB_CODE_SUCCESS) {
pQInfo->killed = 1;
return NULL;
}
} }
qTrace("QInfo:%p vid:%d sid:%d id:%s, query in cache, qrange:%lld-%lld, lastKey:%lld", pQInfo, pMeterObj->vnode, qTrace("QInfo:%p vid:%d sid:%d id:%s, query in cache, qrange:%lld-%lld, lastKey:%lld", pQInfo, pMeterObj->vnode,
@ -306,7 +310,7 @@ static SMeterDataInfo *queryOnMultiDataFiles(SQInfo *pQInfo, SMeterDataInfo *pMe
if (pReqMeterDataInfo == NULL) { if (pReqMeterDataInfo == NULL) {
dError("QInfo:%p failed to allocate memory to perform query processing, abort", pQInfo); dError("QInfo:%p failed to allocate memory to perform query processing, abort", pQInfo);
pQInfo->code = TSDB_CODE_SERV_OUT_OF_MEMORY; pQInfo->code = -TSDB_CODE_SERV_OUT_OF_MEMORY;
pQInfo->killed = 1; pQInfo->killed = 1;
return NULL; return NULL;
} }
@ -338,7 +342,7 @@ static SMeterDataInfo *queryOnMultiDataFiles(SQInfo *pQInfo, SMeterDataInfo *pMe
dError("QInfo:%p failed to allocate memory to perform query processing, abort", pQInfo); dError("QInfo:%p failed to allocate memory to perform query processing, abort", pQInfo);
tfree(pReqMeterDataInfo); tfree(pReqMeterDataInfo);
pQInfo->code = TSDB_CODE_SERV_OUT_OF_MEMORY; pQInfo->code = -TSDB_CODE_SERV_OUT_OF_MEMORY;
pQInfo->killed = 1; pQInfo->killed = 1;
return NULL; return NULL;
} }
@ -393,7 +397,12 @@ static SMeterDataInfo *queryOnMultiDataFiles(SQInfo *pQInfo, SMeterDataInfo *pMe
setExecutionContext(pSupporter, pSupporter->pResult, pOneMeterDataInfo->meterOrderIdx, setExecutionContext(pSupporter, pSupporter->pResult, pOneMeterDataInfo->meterOrderIdx,
pOneMeterDataInfo->groupIdx, pMeterQueryInfo); pOneMeterDataInfo->groupIdx, pMeterQueryInfo);
} else { // interval query } else { // interval query
setIntervalQueryExecutionContext(pSupporter, pOneMeterDataInfo->meterOrderIdx, pMeterQueryInfo); int32_t ret = setIntervalQueryExecutionContext(pSupporter, pOneMeterDataInfo->meterOrderIdx, pMeterQueryInfo);
if (ret != TSDB_CODE_SUCCESS) {
tfree(pReqMeterDataInfo); // error code has been set
pQInfo->killed = 1;
return NULL;
}
} }
SCompBlock *pBlock = pInfoEx->pBlock.compBlock; SCompBlock *pBlock = pInfoEx->pBlock.compBlock;
@ -900,7 +909,12 @@ static void vnodeMultiMeterQueryProcessor(SQInfo *pQInfo) {
dTrace("QInfo:%p main scan completed, elapsed time: %lldms, supplementary scan start, order:%d", pQInfo, et - st, dTrace("QInfo:%p main scan completed, elapsed time: %lldms, supplementary scan start, order:%d", pQInfo, et - st,
pQuery->order.order ^ 1); pQuery->order.order ^ 1);
doCloseAllOpenedResults(pSupporter); // failed to save all intermediate results into disk, abort further query processing
if (doCloseAllOpenedResults(pSupporter) != TSDB_CODE_SUCCESS) {
dError("QInfo:%p failed to save intermediate results, abort further query processing", pQInfo);
return;
}
doMultiMeterSupplementaryScan(pQInfo); doMultiMeterSupplementaryScan(pQInfo);
if (isQueryKilled(pQuery)) { if (isQueryKilled(pQuery)) {
@ -911,12 +925,13 @@ static void vnodeMultiMeterQueryProcessor(SQInfo *pQInfo) {
if (pQuery->nAggTimeInterval > 0) { if (pQuery->nAggTimeInterval > 0) {
assert(pSupporter->subgroupIdx == 0 && pSupporter->numOfGroupResultPages == 0); assert(pSupporter->subgroupIdx == 0 && pSupporter->numOfGroupResultPages == 0);
mergeMetersResultToOneGroups(pSupporter); if (mergeMetersResultToOneGroups(pSupporter) == TSDB_CODE_SUCCESS) {
copyResToQueryResultBuf(pSupporter, pQuery); copyResToQueryResultBuf(pSupporter, pQuery);
#ifdef _DEBUG_VIEW #ifdef _DEBUG_VIEW
displayInterResult(pQuery->sdata, pQuery, pQuery->sdata[0]->len); displayInterResult(pQuery->sdata, pQuery, pQuery->sdata[0]->len);
#endif #endif
}
} else { // not a interval query } else { // not a interval query
copyFromGroupBuf(pQInfo, pSupporter->pResult); copyFromGroupBuf(pQInfo, pSupporter->pResult);
} }

View File

@ -824,11 +824,11 @@ int vnodeRetrieveQueryInfo(void *handle, int *numOfRows, int *rowSize, int16_t *
} }
if (pQInfo->killed) { if (pQInfo->killed) {
dTrace("QInfo:%p it is already killed, %p, code:%d", pQInfo, pQuery, pQInfo->code); dTrace("QInfo:%p query is killed, %p, code:%d", pQInfo, pQuery, pQInfo->code);
if (pQInfo->code == TSDB_CODE_SUCCESS) { if (pQInfo->code == TSDB_CODE_SUCCESS) {
return TSDB_CODE_QUERY_CANCELLED; return TSDB_CODE_QUERY_CANCELLED;
} else { // in case of not TSDB_CODE_SUCCESS, return the code to client } else { // in case of not TSDB_CODE_SUCCESS, return the code to client
return pQInfo->code; return abs(pQInfo->code);
} }
} }
@ -837,8 +837,13 @@ int vnodeRetrieveQueryInfo(void *handle, int *numOfRows, int *rowSize, int16_t *
*rowSize = pQuery->rowSize; *rowSize = pQuery->rowSize;
*timePrec = vnodeList[pQInfo->pObj->vnode].cfg.precision; *timePrec = vnodeList[pQInfo->pObj->vnode].cfg.precision;
if (pQInfo->code < 0) return -pQInfo->code; dTrace("QInfo:%p, retrieve data info completed, precision:%d, rowsize:%d, rows:%d, code:%d", pQInfo, *timePrec,
*rowSize, *numOfRows, pQInfo->code);
if (pQInfo->code < 0) { // less than 0 means there are error existed.
return -pQInfo->code;
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }

View File

@ -606,7 +606,7 @@ int vnodeProcessShellSubmitRequest(char *pMsg, int msgLen, SShellObj *pObj) {
if (tsAvailDataDirGB < tsMinimalDataDirGB) { if (tsAvailDataDirGB < tsMinimalDataDirGB) {
dError("server disk space remain %.3f GB, need at least %.3f GB, stop writing", tsAvailDataDirGB, tsMinimalDataDirGB); dError("server disk space remain %.3f GB, need at least %.3f GB, stop writing", tsAvailDataDirGB, tsMinimalDataDirGB);
code = TSDB_CODE_SERVER_NO_SPACE; code = TSDB_CODE_SERV_NO_DISKSPACE;
goto _submit_over; goto _submit_over;
} }

View File

@ -124,6 +124,7 @@ int tsMgmtEqualVnodeNum = 0;
int tsEnableHttpModule = 1; int tsEnableHttpModule = 1;
int tsEnableMonitorModule = 1; int tsEnableMonitorModule = 1;
int tsRestRowLimit = 10240; int tsRestRowLimit = 10240;
int tsMaxSQLStringLen = TSDB_MAX_SQL_LEN;
/* /*
* denote if the server needs to compress response message at the application layer to client, including query rsp, * denote if the server needs to compress response message at the application layer to client, including query rsp,
@ -653,7 +654,11 @@ static void doInitGlobalConfig() {
tsInitConfigOption(cfg++, "compressMsgSize", &tsCompressMsgSize, TSDB_CFG_VTYPE_INT, tsInitConfigOption(cfg++, "compressMsgSize", &tsCompressMsgSize, TSDB_CFG_VTYPE_INT,
TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_CLIENT | TSDB_CFG_CTYPE_B_SHOW, TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_CLIENT | TSDB_CFG_CTYPE_B_SHOW,
-1, 10000000, 0, TSDB_CFG_UTYPE_NONE); -1, 10000000, 0, TSDB_CFG_UTYPE_NONE);
tsInitConfigOption(cfg++, "maxSQLLength", &tsMaxSQLStringLen, TSDB_CFG_VTYPE_INT,
TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_CLIENT | TSDB_CFG_CTYPE_B_SHOW,
TSDB_MAX_SQL_LEN, TSDB_MAX_ALLOWED_SQL_LEN, 0, TSDB_CFG_UTYPE_BYTE);
// locale & charset // locale & charset
tsInitConfigOption(cfg++, "timezone", tsTimezone, TSDB_CFG_VTYPE_STRING, tsInitConfigOption(cfg++, "timezone", tsTimezone, TSDB_CFG_VTYPE_STRING,
TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_CLIENT, TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_CLIENT,

View File

@ -13,10 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "os.h"
#include "tstrbuild.h" #include "tstrbuild.h"
#include <stdlib.h>
#include <string.h>
#include <stdio.h>
void taosStringBuilderEnsureCapacity(SStringBuilder* sb, size_t size) { void taosStringBuilderEnsureCapacity(SStringBuilder* sb, size_t size) {
size += sb->pos; size += sb->pos;