[td-225]fix bug found in test script
This commit is contained in:
parent
656128d9cf
commit
988400dc43
|
@ -1909,7 +1909,8 @@ static void valuePairAssign(tValuePair *dst, int16_t type, const char *val, int6
|
|||
__ctx->tag = (tVariant) {.nType = TSDB_DATA_TYPE_BIGINT, .i64Key = tsKey};
|
||||
}
|
||||
|
||||
tVariantDump(&pTagInfo->pTagCtxList[i]->tag, dst->pTags + size, pTagInfo->pTagCtxList[i]->tag.nType);
|
||||
//todo? error ??
|
||||
tVariantDump(&pTagInfo->pTagCtxList[i]->tag, dst->pTags + size, pTagInfo->pTagCtxList[i]->tag.nType, false);
|
||||
size += pTagInfo->pTagCtxList[i]->outputBytes;
|
||||
}
|
||||
}
|
||||
|
@ -2981,14 +2982,7 @@ static void tag_project_function(SQLFunctionCtx *pCtx) {
|
|||
assert(pCtx->inputBytes == pCtx->outputBytes);
|
||||
|
||||
for (int32_t i = 0; i < pCtx->size; ++i) {
|
||||
char* output = pCtx->aOutputBuf;
|
||||
|
||||
if (pCtx->tag.nType == TSDB_DATA_TYPE_BINARY || pCtx->tag.nType == TSDB_DATA_TYPE_NCHAR) {
|
||||
varDataSetLen(output, pCtx->tag.nLen);
|
||||
tVariantDump(&pCtx->tag, varDataVal(output), pCtx->outputType);
|
||||
} else {
|
||||
tVariantDump(&pCtx->tag, output, pCtx->outputType);
|
||||
}
|
||||
tVariantDump(&pCtx->tag, pCtx->aOutputBuf, pCtx->outputType, true);
|
||||
|
||||
pCtx->aOutputBuf += pCtx->outputBytes;
|
||||
}
|
||||
|
@ -2997,14 +2991,7 @@ static void tag_project_function(SQLFunctionCtx *pCtx) {
|
|||
static void tag_project_function_f(SQLFunctionCtx *pCtx, int32_t index) {
|
||||
INC_INIT_VAL(pCtx, 1);
|
||||
|
||||
char* output = pCtx->aOutputBuf;
|
||||
if (pCtx->tag.nType == TSDB_DATA_TYPE_BINARY || pCtx->tag.nType == TSDB_DATA_TYPE_NCHAR) {
|
||||
*(int16_t*) output = pCtx->tag.nLen;
|
||||
output += VARSTR_HEADER_SIZE;
|
||||
}
|
||||
|
||||
// todo : handle the binary/nchar data
|
||||
tVariantDump(&pCtx->tag, output, pCtx->tag.nType);
|
||||
tVariantDump(&pCtx->tag, pCtx->aOutputBuf, pCtx->tag.nType, true);
|
||||
pCtx->aOutputBuf += pCtx->outputBytes;
|
||||
}
|
||||
|
||||
|
@ -3017,30 +3004,12 @@ static void tag_project_function_f(SQLFunctionCtx *pCtx, int32_t index) {
|
|||
*/
|
||||
static void tag_function(SQLFunctionCtx *pCtx) {
|
||||
SET_VAL(pCtx, 1, 1);
|
||||
|
||||
char* output = pCtx->aOutputBuf;
|
||||
|
||||
// todo refactor to dump length presented string(var string)
|
||||
if (pCtx->tag.nType == TSDB_DATA_TYPE_BINARY || pCtx->tag.nType == TSDB_DATA_TYPE_NCHAR) {
|
||||
*(int16_t*) output = pCtx->tag.nLen;
|
||||
output += VARSTR_HEADER_SIZE;
|
||||
}
|
||||
|
||||
tVariantDump(&pCtx->tag, output, pCtx->tag.nType);
|
||||
tVariantDump(&pCtx->tag, pCtx->aOutputBuf, pCtx->tag.nType, true);
|
||||
}
|
||||
|
||||
static void tag_function_f(SQLFunctionCtx *pCtx, int32_t index) {
|
||||
SET_VAL(pCtx, 1, 1);
|
||||
|
||||
char* output = pCtx->aOutputBuf;
|
||||
|
||||
// todo refactor to dump length presented string(var string)
|
||||
if (pCtx->tag.nType == TSDB_DATA_TYPE_BINARY || pCtx->tag.nType == TSDB_DATA_TYPE_NCHAR) {
|
||||
*(int16_t*) output = pCtx->tag.nLen;
|
||||
output += VARSTR_HEADER_SIZE;
|
||||
}
|
||||
|
||||
tVariantDump(&pCtx->tag, output, pCtx->tag.nType);
|
||||
tVariantDump(&pCtx->tag, pCtx->aOutputBuf, pCtx->tag.nType, true);
|
||||
}
|
||||
|
||||
static void copy_function(SQLFunctionCtx *pCtx) {
|
||||
|
@ -3991,7 +3960,7 @@ static void interp_function(SQLFunctionCtx *pCtx) {
|
|||
|
||||
SET_VAL(pCtx, pCtx->size, 1);
|
||||
} else if (pInfo->type == TSDB_FILL_SET_VALUE) {
|
||||
tVariantDump(&pCtx->param[1], pCtx->aOutputBuf, pCtx->inputType);
|
||||
tVariantDump(&pCtx->param[1], pCtx->aOutputBuf, pCtx->inputType, true);
|
||||
} else if (pInfo->type == TSDB_FILL_PREV) {
|
||||
char *data = GET_INPUT_CHAR_INDEX(pCtx, 0);
|
||||
assignVal(pCtx->aOutputBuf, data, pCtx->outputBytes, pCtx->outputType);
|
||||
|
|
|
@ -142,7 +142,7 @@ static int setColumnFilterInfoForTimestamp(SQueryInfo* pQueryInfo, tVariant* pVa
|
|||
return invalidSqlErrMsg(pQueryInfo->msg, msg);
|
||||
}
|
||||
} else {
|
||||
if (tVariantDump(pVar, (char*)&time, TSDB_DATA_TYPE_BIGINT)) {
|
||||
if (tVariantDump(pVar, (char*)&time, TSDB_DATA_TYPE_BIGINT, true)) {
|
||||
return invalidSqlErrMsg(pQueryInfo->msg, msg);
|
||||
}
|
||||
}
|
||||
|
@ -1628,14 +1628,14 @@ int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIndex, tSQLExpr
|
|||
if (optr == TK_LEASTSQUARES) {
|
||||
/* set the leastsquares parameters */
|
||||
char val[8] = {0};
|
||||
if (tVariantDump(&pParamElem[1].pNode->val, val, TSDB_DATA_TYPE_DOUBLE) < 0) {
|
||||
if (tVariantDump(&pParamElem[1].pNode->val, val, TSDB_DATA_TYPE_DOUBLE, true) < 0) {
|
||||
return TSDB_CODE_INVALID_SQL;
|
||||
}
|
||||
|
||||
addExprParams(pExpr, val, TSDB_DATA_TYPE_DOUBLE, DOUBLE_BYTES, 0);
|
||||
|
||||
memset(val, 0, tListLen(val));
|
||||
if (tVariantDump(&pParamElem[2].pNode->val, val, TSDB_DATA_TYPE_DOUBLE) < 0) {
|
||||
if (tVariantDump(&pParamElem[2].pNode->val, val, TSDB_DATA_TYPE_DOUBLE, true) < 0) {
|
||||
return TSDB_CODE_INVALID_SQL;
|
||||
}
|
||||
|
||||
|
@ -1795,7 +1795,7 @@ int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIndex, tSQLExpr
|
|||
SSqlExpr* pExpr = NULL;
|
||||
|
||||
if (optr == TK_PERCENTILE || optr == TK_APERCENTILE) {
|
||||
tVariantDump(pVariant, val, TSDB_DATA_TYPE_DOUBLE);
|
||||
tVariantDump(pVariant, val, TSDB_DATA_TYPE_DOUBLE, true);
|
||||
|
||||
double dp = GET_DOUBLE_VAL(val);
|
||||
if (dp < 0 || dp > TOP_BOTTOM_QUERY_LIMIT) {
|
||||
|
@ -1818,7 +1818,7 @@ int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIndex, tSQLExpr
|
|||
pExpr = tscSqlExprAppend(pQueryInfo, functionId, &index, resultType, resultSize, resultSize, false);
|
||||
addExprParams(pExpr, val, TSDB_DATA_TYPE_DOUBLE, sizeof(double), 0);
|
||||
} else {
|
||||
tVariantDump(pVariant, val, TSDB_DATA_TYPE_BIGINT);
|
||||
tVariantDump(pVariant, val, TSDB_DATA_TYPE_BIGINT, true);
|
||||
|
||||
int64_t nTop = *((int32_t*)val);
|
||||
if (nTop <= 0 || nTop > 100) { // todo use macro
|
||||
|
@ -2631,23 +2631,23 @@ static int32_t doExtractColumnFilterInfo(SQueryInfo* pQueryInfo, SColumnFilterIn
|
|||
}
|
||||
|
||||
if (pExpr->nSQLOptr == TK_LE || pExpr->nSQLOptr == TK_LT) {
|
||||
tVariantDump(&pRight->val, (char*)&pColumnFilter->upperBndd, colType);
|
||||
tVariantDump(&pRight->val, (char*)&pColumnFilter->upperBndd, colType, false);
|
||||
} else { // TK_GT,TK_GE,TK_EQ,TK_NE are based on the pColumn->lowerBndd
|
||||
if (colType == TSDB_DATA_TYPE_BINARY) {
|
||||
pColumnFilter->pz = (int64_t)calloc(1, pRight->val.nLen + 1);
|
||||
pColumnFilter->len = pRight->val.nLen;
|
||||
|
||||
tVariantDump(&pRight->val, (char*)pColumnFilter->pz, colType);
|
||||
tVariantDump(&pRight->val, (char*)pColumnFilter->pz, colType, false);
|
||||
} else if (colType == TSDB_DATA_TYPE_NCHAR) {
|
||||
// pRight->val.nLen + 1 is larger than the actual nchar string length
|
||||
pColumnFilter->pz = (int64_t)calloc(1, (pRight->val.nLen + 1) * TSDB_NCHAR_SIZE);
|
||||
|
||||
tVariantDump(&pRight->val, (char*)pColumnFilter->pz, colType);
|
||||
tVariantDump(&pRight->val, (char*)pColumnFilter->pz, colType, false);
|
||||
|
||||
size_t len = wcslen((wchar_t*)pColumnFilter->pz);
|
||||
pColumnFilter->len = len * TSDB_NCHAR_SIZE;
|
||||
} else {
|
||||
tVariantDump(&pRight->val, (char*)&pColumnFilter->lowerBndd, colType);
|
||||
tVariantDump(&pRight->val, (char*)&pColumnFilter->lowerBndd, colType, false);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -3336,9 +3336,8 @@ static int32_t handleExprInQueryCond(SQueryInfo* pQueryInfo, tSQLExpr** pExpr, S
|
|||
|
||||
*pExpr = NULL; // remove this expression
|
||||
*type = TSQL_EXPR_TS;
|
||||
} else if (index.columnIndex >= tscGetNumOfColumns(pTableMeta) ||
|
||||
index.columnIndex == TSDB_TBNAME_COLUMN_INDEX) { // query on tags
|
||||
// check for tag query condition
|
||||
} else if (index.columnIndex >= tscGetNumOfColumns(pTableMeta) || index.columnIndex == TSDB_TBNAME_COLUMN_INDEX) {
|
||||
// query on tags, check for tag query condition
|
||||
if (UTIL_TABLE_IS_NORMAL_TABLE(pTableMetaInfo)) {
|
||||
return invalidSqlErrMsg(pQueryInfo->msg, msg1);
|
||||
}
|
||||
|
@ -3933,7 +3932,7 @@ int32_t getTimeRange(STimeWindow* win, tSQLExpr* pRight, int32_t optr, int16_t t
|
|||
* failed to parse timestamp in regular formation, try next
|
||||
* it may be a epoch time in string format
|
||||
*/
|
||||
tVariantDump(&pRight->val, (char*)&val, TSDB_DATA_TYPE_BIGINT);
|
||||
tVariantDump(&pRight->val, (char*)&val, TSDB_DATA_TYPE_BIGINT, true);
|
||||
|
||||
/*
|
||||
* transfer it into MICROSECOND format if it is a string, since for
|
||||
|
@ -4070,14 +4069,13 @@ int32_t parseFillClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySQL) {
|
|||
continue;
|
||||
}
|
||||
|
||||
int32_t ret = tVariantDump(&pFillToken->a[j].pVar, (char*)&pQueryInfo->fillVal[i], pFields->type);
|
||||
int32_t ret = tVariantDump(&pFillToken->a[j].pVar, (char*)&pQueryInfo->fillVal[i], pFields->type, true);
|
||||
if (ret != TSDB_CODE_SUCCESS) {
|
||||
return invalidSqlErrMsg(pQueryInfo->msg, msg);
|
||||
}
|
||||
}
|
||||
|
||||
if ((pFillToken->nExpr < size) ||
|
||||
((pFillToken->nExpr - 1 < size) && (tscIsPointInterpQuery(pQueryInfo)))) {
|
||||
if ((pFillToken->nExpr < size) || ((pFillToken->nExpr - 1 < size) && (tscIsPointInterpQuery(pQueryInfo)))) {
|
||||
tVariantListItem* lastItem = &pFillToken->a[pFillToken->nExpr - 1];
|
||||
|
||||
for (int32_t i = numOfFillVal; i < size; ++i) {
|
||||
|
@ -4086,7 +4084,7 @@ int32_t parseFillClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySQL) {
|
|||
if (pFields->type == TSDB_DATA_TYPE_BINARY || pFields->type == TSDB_DATA_TYPE_NCHAR) {
|
||||
setVardataNull((char*) &pQueryInfo->fillVal[i], pFields->type);
|
||||
} else {
|
||||
tVariantDump(&lastItem->pVar, (char*)&pQueryInfo->fillVal[i], pFields->type);
|
||||
tVariantDump(&lastItem->pVar, (char*)&pQueryInfo->fillVal[i], pFields->type, true);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -4420,10 +4418,10 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
|
|||
}
|
||||
|
||||
SSchema* pTagsSchema = tscGetTableColumnSchema(pTableMetaInfo->pTableMeta, columnIndex.columnIndex);
|
||||
if (tVariantDump(&pVarList->a[1].pVar, pAlterSQL->tagData.data /*pCmd->payload*/, pTagsSchema->type) !=
|
||||
TSDB_CODE_SUCCESS) {
|
||||
if (tVariantDump(&pVarList->a[1].pVar, pAlterSQL->tagData.data, pTagsSchema->type, true) != TSDB_CODE_SUCCESS) {
|
||||
return invalidSqlErrMsg(pQueryInfo->msg, msg13);
|
||||
}
|
||||
|
||||
pAlterSQL->tagData.dataLen = pTagsSchema->bytes;
|
||||
|
||||
// validate the length of binary
|
||||
|
@ -5571,21 +5569,9 @@ int32_t doCheckForCreateFromStable(SSqlObj* pSql, SSqlInfo* pInfo) {
|
|||
if (pList->a[i].pVar.nLen + VARSTR_HEADER_SIZE > pTagSchema[i].bytes) {
|
||||
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3);
|
||||
}
|
||||
|
||||
ret = tVariantDump(&(pList->a[i].pVar), varDataVal(tagVal), pTagSchema[i].type);
|
||||
if (pList->a[i].pVar.nType == TSDB_DATA_TYPE_NULL) {
|
||||
if (pTagSchema[i].type == TSDB_DATA_TYPE_BINARY) {
|
||||
varDataSetLen(tagVal, sizeof(uint8_t));
|
||||
} else {
|
||||
varDataSetLen(tagVal, sizeof(uint32_t));
|
||||
}
|
||||
} else { // todo refactor
|
||||
varDataSetLen(tagVal, pList->a[i].pVar.nLen);
|
||||
}
|
||||
} else {
|
||||
ret = tVariantDump(&(pList->a[i].pVar), tagVal, pTagSchema[i].type);
|
||||
}
|
||||
|
||||
ret = tVariantDump(&(pList->a[i].pVar), tagVal, pTagSchema[i].type, true);
|
||||
if (ret != TSDB_CODE_SUCCESS) {
|
||||
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg4);
|
||||
}
|
||||
|
|
|
@ -1289,7 +1289,7 @@ int tscBuildAlterTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
|||
|
||||
pAlterTableMsg->numOfCols = htons(tscNumOfFields(pQueryInfo));
|
||||
SSchema *pSchema = pAlterTableMsg->schema;
|
||||
for (int i = 0; i < pAlterTableMsg->numOfCols; ++i) {
|
||||
for (int i = 0; i < tscNumOfFields(pQueryInfo); ++i) {
|
||||
TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
|
||||
|
||||
pSchema->type = pField->type;
|
||||
|
|
|
@ -1765,6 +1765,8 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) {
|
|||
}
|
||||
|
||||
pSql->pSubs[i] = pNew;
|
||||
pNew->fetchFp = pNew->fp;
|
||||
|
||||
tscTrace("%p sub:%p create subObj success. orderOfSub:%d", pSql, pNew, i);
|
||||
}
|
||||
|
||||
|
|
|
@ -196,6 +196,7 @@ void * tdQueryTagByID(SDataRow row, int16_t colId, int16_t *type) {
|
|||
STagCol key = {colId,0,0};
|
||||
STagCol * stCol = taosbsearch(&key, pBase, nCols, sizeof(STagCol), compTagId, TD_EQ);
|
||||
if (NULL == stCol) {
|
||||
type = TSDB_DATA_TYPE_NULL;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
|
|
@ -32,6 +32,35 @@ const int32_t TYPE_BYTES[11] = {
|
|||
sizeof(VarDataOffsetT) // TSDB_DATA_TYPE_NCHAR
|
||||
};
|
||||
|
||||
static void getStatics_bool(const TSKEY *primaryKey, const void *pData, int32_t numOfRow, int64_t *min, int64_t *max,
|
||||
int64_t *sum, int16_t *minIndex, int16_t *maxIndex, int16_t *numOfNull) {
|
||||
int8_t *data = (int8_t *)pData;
|
||||
*min = INT64_MAX;
|
||||
*max = INT64_MIN;
|
||||
*minIndex = 0;
|
||||
*maxIndex = 0;
|
||||
|
||||
ASSERT(numOfRow <= INT16_MAX);
|
||||
|
||||
for (int32_t i = 0; i < numOfRow; ++i) {
|
||||
if (isNull((char *)&data[i], TSDB_DATA_TYPE_BOOL)) {
|
||||
(*numOfNull) += 1;
|
||||
continue;
|
||||
}
|
||||
|
||||
*sum += data[i];
|
||||
if (*min > data[i]) {
|
||||
*min = data[i];
|
||||
*minIndex = i;
|
||||
}
|
||||
|
||||
if (*max < data[i]) {
|
||||
*max = data[i];
|
||||
*maxIndex = i;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static void getStatics_i8(const TSKEY *primaryKey, const void *pData, int32_t numOfRow, int64_t *min, int64_t *max,
|
||||
int64_t *sum, int16_t *minIndex, int16_t *maxIndex, int16_t *numOfNull) {
|
||||
int8_t *data = (int8_t *)pData;
|
||||
|
@ -131,15 +160,6 @@ static void getStatics_i32(const TSKEY *primaryKey, const void *pData, int32_t n
|
|||
*max = data[i];
|
||||
*maxIndex = i;
|
||||
}
|
||||
|
||||
// if (isNull(&lastVal, TSDB_DATA_TYPE_INT)) {
|
||||
// lastKey = primaryKey[i];
|
||||
// lastVal = data[i];
|
||||
// } else {
|
||||
// *wsum = lastVal * (primaryKey[i] - lastKey);
|
||||
// lastKey = primaryKey[i];
|
||||
// lastVal = data[i];
|
||||
// }
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -279,11 +299,11 @@ static void getStatics_bin(const TSKEY *primaryKey, const void *pData, int32_t n
|
|||
ASSERT(numOfRow <= INT16_MAX);
|
||||
|
||||
for (int32_t i = 0; i < numOfRow; ++i) {
|
||||
if (isNull((const char*) varDataVal(data), TSDB_DATA_TYPE_BINARY)) {
|
||||
if (isNull(data, TSDB_DATA_TYPE_BINARY)) {
|
||||
(*numOfNull) += 1;
|
||||
}
|
||||
|
||||
data += varDataLen(data);
|
||||
data += varDataTLen(data);
|
||||
}
|
||||
|
||||
*sum = 0;
|
||||
|
@ -299,11 +319,11 @@ static void getStatics_nchr(const TSKEY *primaryKey, const void *pData, int32_t
|
|||
ASSERT(numOfRow <= INT16_MAX);
|
||||
|
||||
for (int32_t i = 0; i < numOfRow; ++i) {
|
||||
if (isNull((const char*) varDataVal(data), TSDB_DATA_TYPE_NCHAR)) {
|
||||
if (isNull(data, TSDB_DATA_TYPE_NCHAR)) {
|
||||
(*numOfNull) += 1;
|
||||
}
|
||||
|
||||
data += varDataLen(data);
|
||||
data += varDataTLen(data);
|
||||
}
|
||||
|
||||
*sum = 0;
|
||||
|
@ -315,7 +335,7 @@ static void getStatics_nchr(const TSKEY *primaryKey, const void *pData, int32_t
|
|||
|
||||
tDataTypeDescriptor tDataTypeDesc[11] = {
|
||||
{TSDB_DATA_TYPE_NULL, 6, 1, "NOTYPE", NULL, NULL, NULL},
|
||||
{TSDB_DATA_TYPE_BOOL, 4, CHAR_BYTES, "BOOL", tsCompressBool, tsDecompressBool, getStatics_i8},
|
||||
{TSDB_DATA_TYPE_BOOL, 4, CHAR_BYTES, "BOOL", tsCompressBool, tsDecompressBool, getStatics_bool},
|
||||
{TSDB_DATA_TYPE_TINYINT, 7, CHAR_BYTES, "TINYINT", tsCompressTinyint, tsDecompressTinyint, getStatics_i8},
|
||||
{TSDB_DATA_TYPE_SMALLINT, 8, SHORT_BYTES, "SMALLINT", tsCompressSmallint, tsDecompressSmallint, getStatics_i16},
|
||||
{TSDB_DATA_TYPE_INT, 3, INT_BYTES, "INT", tsCompressInt, tsDecompressInt, getStatics_i32},
|
||||
|
|
|
@ -292,9 +292,9 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size);
|
|||
#define TSDB_MAX_COMP_LEVEL 2
|
||||
#define TSDB_DEFAULT_COMP_LEVEL 2
|
||||
|
||||
#define TSDB_MIN_WAL_LEVEL 0
|
||||
#define TSDB_MAX_WAL_LEVEL 2
|
||||
#define TSDB_DEFAULT_WAL_LEVEL 2
|
||||
#define TSDB_MIN_WAL_LEVEL 1
|
||||
#define TSDB_MAX_WAL_LEVEL 2
|
||||
#define TSDB_DEFAULT_WAL_LEVEL 1
|
||||
|
||||
#define TSDB_MIN_REPLICA_NUM 1
|
||||
#define TSDB_MAX_REPLICA_NUM 3
|
||||
|
|
|
@ -275,8 +275,8 @@ static int32_t mgmtCheckDbCfg(SDbCfg *pCfg) {
|
|||
return TSDB_CODE_INVALID_OPTION;
|
||||
}
|
||||
|
||||
if (pCfg->replications > 1 && pCfg->walLevel <= TSDB_MIN_WAL_LEVEL) {
|
||||
mError("invalid db option walLevel:%d must > 0, while replica:%d > 1", pCfg->walLevel, pCfg->replications);
|
||||
if (pCfg->walLevel < TSDB_MIN_WAL_LEVEL) {
|
||||
mError("invalid db option walLevel:%d must be greater than 0", pCfg->walLevel);
|
||||
return TSDB_CODE_INVALID_OPTION;
|
||||
}
|
||||
|
||||
|
@ -871,8 +871,8 @@ static SDbCfg mgmtGetAlterDbOption(SDbObj *pDb, SCMAlterDbMsg *pAlter) {
|
|||
mTrace("db:%s, replications:%d change to %d", pDb->name, pDb->cfg.replications, replications);
|
||||
newCfg.replications = replications;
|
||||
|
||||
if (replications > 1 && pDb->cfg.walLevel <= TSDB_MIN_WAL_LEVEL) {
|
||||
mError("db:%s, walLevel:%d must > 0, while replica:%d > 1", pDb->name, pDb->cfg.walLevel, replications);
|
||||
if (pDb->cfg.walLevel < TSDB_MIN_WAL_LEVEL) {
|
||||
mError("db:%s, walLevel:%d must be greater than 0", pDb->name, pDb->cfg.walLevel);
|
||||
terrno = TSDB_CODE_INVALID_OPTION;
|
||||
}
|
||||
|
||||
|
|
|
@ -48,7 +48,7 @@ void tVariantAssign(tVariant *pDst, const tVariant *pSrc);
|
|||
|
||||
int32_t tVariantToString(tVariant *pVar, char *dst);
|
||||
|
||||
int32_t tVariantDump(tVariant *pVariant, char *payload, char type);
|
||||
int32_t tVariantDump(tVariant *pVariant, char *payload, char type, bool includeLengthPrefix);
|
||||
|
||||
int32_t tVariantTypeSetType(tVariant *pVariant, char type);
|
||||
|
||||
|
|
|
@ -55,12 +55,6 @@
|
|||
((query)->colList[(query)->pSelectExpr[colidx].base.colInfo.colIndex].bytes)
|
||||
#define GET_COLUMN_TYPE(query, colidx) ((query)->colList[(query)->pSelectExpr[colidx].base.colInfo.colIndex].type)
|
||||
|
||||
typedef struct SPointInterpoSupporter {
|
||||
int32_t numOfCols;
|
||||
SArray* prev;
|
||||
SArray* next;
|
||||
} SPointInterpoSupporter;
|
||||
|
||||
typedef enum {
|
||||
// when query starts to execute, this status will set
|
||||
QUERY_NOT_COMPLETED = 0x1u,
|
||||
|
@ -122,111 +116,6 @@ static void buildTagQueryResult(SQInfo *pQInfo);
|
|||
static int32_t setAdditionalInfo(SQInfo *pQInfo, STableId *pTableId, STableQueryInfo *pTableQueryInfo);
|
||||
static int32_t flushFromResultBuf(SQInfo *pQInfo);
|
||||
|
||||
bool getNeighborPoints(SQInfo *pQInfo, void *pMeterObj, SPointInterpoSupporter *pPointInterpSupporter) {
|
||||
#if 0
|
||||
SQueryRuntimeEnv *pRuntimeEnv = &pSupporter->runtimeEnv;
|
||||
SQuery * pQuery = pRuntimeEnv->pQuery;
|
||||
|
||||
if (!isPointInterpoQuery(pQuery)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
/*
|
||||
* for interpolate point query, points that are directly before/after the specified point are required
|
||||
*/
|
||||
if (isFirstLastRowQuery(pQuery)) {
|
||||
assert(!QUERY_IS_ASC_QUERY(pQuery));
|
||||
} else {
|
||||
assert(QUERY_IS_ASC_QUERY(pQuery));
|
||||
}
|
||||
assert(pPointInterpSupporter != NULL && pQuery->skey == pQuery->ekey);
|
||||
|
||||
SCacheBlock *pBlock = NULL;
|
||||
|
||||
qTrace("QInfo:%p get next data point, fileId:%d, slot:%d, pos:%d", GET_QINFO_ADDR(pQuery), pQuery->fileId,
|
||||
pQuery->slot, pQuery->pos);
|
||||
|
||||
// save the point that is directly after or equals to the specified point
|
||||
getOneRowFromDataBlock(pRuntimeEnv, pPointInterpSupporter->pNextPoint, pQuery->pos);
|
||||
|
||||
/*
|
||||
* 1. for last_row query, return immediately.
|
||||
* 2. the specified timestamp equals to the required key, interpolation according to neighbor points is not necessary
|
||||
* for interp query.
|
||||
*/
|
||||
TSKEY actualKey = *(TSKEY *)pPointInterpSupporter->pNextPoint[0];
|
||||
if (isFirstLastRowQuery(pQuery) || actualKey == pQuery->skey) {
|
||||
setQueryStatus(pQuery, QUERY_NOT_COMPLETED);
|
||||
|
||||
/*
|
||||
* the retrieved ts may not equals to pMeterObj->lastKey due to cache re-allocation
|
||||
* set the pQuery->ekey/pQuery->skey/pQuery->lastKey to be the new value.
|
||||
*/
|
||||
if (pQuery->ekey != actualKey) {
|
||||
pQuery->skey = actualKey;
|
||||
pQuery->ekey = actualKey;
|
||||
pQuery->lastKey = actualKey;
|
||||
pSupporter->rawSKey = actualKey;
|
||||
pSupporter->rawEKey = actualKey;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/* the qualified point is not the first point in data block */
|
||||
if (pQuery->pos > 0) {
|
||||
int32_t prevPos = pQuery->pos - 1;
|
||||
|
||||
/* save the point that is directly after the specified point */
|
||||
getOneRowFromDataBlock(pRuntimeEnv, pPointInterpSupporter->pPrevPoint, prevPos);
|
||||
} else {
|
||||
__block_search_fn_t searchFn = vnodeSearchKeyFunc[pMeterObj->searchAlgorithm];
|
||||
|
||||
// savePointPosition(&pRuntimeEnv->startPos, pQuery->fileId, pQuery->slot, pQuery->pos);
|
||||
|
||||
// backwards movement would not set the pQuery->pos correct. We need to set it manually later.
|
||||
moveToNextBlock(pRuntimeEnv, QUERY_DESC_FORWARD_STEP, searchFn, true);
|
||||
|
||||
/*
|
||||
* no previous data exists.
|
||||
* reset the status and load the data block that contains the qualified point
|
||||
*/
|
||||
if (Q_STATUS_EQUAL(pQuery->over, QUERY_NO_DATA_TO_CHECK)) {
|
||||
qTrace("QInfo:%p no previous data block, start fileId:%d, slot:%d, pos:%d, qrange:%" PRId64 "-%" PRId64
|
||||
", out of range",
|
||||
GET_QINFO_ADDR(pQuery), pRuntimeEnv->startPos.fileId, pRuntimeEnv->startPos.slot,
|
||||
pRuntimeEnv->startPos.pos, pQuery->skey, pQuery->ekey);
|
||||
|
||||
// no result, return immediately
|
||||
setQueryStatus(pQuery, QUERY_COMPLETED);
|
||||
return false;
|
||||
} else { // prev has been located
|
||||
if (pQuery->fileId >= 0) {
|
||||
pQuery->pos = pQuery->pBlock[pQuery->slot].numOfRows - 1;
|
||||
getOneRowFromDataBlock(pRuntimeEnv, pPointInterpSupporter->pPrevPoint, pQuery->pos);
|
||||
|
||||
qTrace("QInfo:%p get prev data point, fileId:%d, slot:%d, pos:%d, pQuery->pos:%d", GET_QINFO_ADDR(pQuery),
|
||||
pQuery->fileId, pQuery->slot, pQuery->pos, pQuery->pos);
|
||||
} else {
|
||||
// moveToNextBlock make sure there is a available cache block, if exists
|
||||
assert(vnodeIsDatablockLoaded(pRuntimeEnv, pMeterObj, -1, true) == DISK_BLOCK_NO_NEED_TO_LOAD);
|
||||
pBlock = &pRuntimeEnv->cacheBlock;
|
||||
|
||||
pQuery->pos = pBlock->numOfRows - 1;
|
||||
getOneRowFromDataBlock(pRuntimeEnv, pPointInterpSupporter->pPrevPoint, pQuery->pos);
|
||||
|
||||
qTrace("QInfo:%p get prev data point, fileId:%d, slot:%d, pos:%d, pQuery->pos:%d", GET_QINFO_ADDR(pQuery),
|
||||
pQuery->fileId, pQuery->slot, pBlock->numOfRows - 1, pQuery->pos);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pQuery->skey = *(TSKEY *)pPointInterpSupporter->pPrevPoint[0];
|
||||
pQuery->ekey = *(TSKEY *)pPointInterpSupporter->pNextPoint[0];
|
||||
pQuery->lastKey = pQuery->skey;
|
||||
#endif
|
||||
return true;
|
||||
}
|
||||
|
||||
bool doFilterData(SQuery *pQuery, int32_t elemPos) {
|
||||
for (int32_t k = 0; k < pQuery->numOfFilterCols; ++k) {
|
||||
SSingleColumnFilterInfo *pFilterInfo = &pQuery->pFilterInfo[k];
|
||||
|
@ -944,11 +833,9 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *
|
|||
SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx;
|
||||
SQuery * pQuery = pRuntimeEnv->pQuery;
|
||||
|
||||
SColumnInfoData *pColInfo = NULL;
|
||||
|
||||
TSKEY *primaryKeyCol = NULL;
|
||||
if (pDataBlock != NULL) {
|
||||
pColInfo = taosArrayGet(pDataBlock, 0);
|
||||
SColumnInfoData* pColInfo = taosArrayGet(pDataBlock, 0);
|
||||
primaryKeyCol = (TSKEY *)(pColInfo->pData);
|
||||
}
|
||||
|
||||
|
@ -1222,9 +1109,6 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
|
|||
continue;
|
||||
}
|
||||
|
||||
// all startOffset are identical
|
||||
// offset -= pCtx[0].startOffset;
|
||||
|
||||
SWindowStatus *pStatus = getTimeWindowResStatus(pWindowResInfo, curTimeWindow(pWindowResInfo));
|
||||
doRowwiseApplyFunctions(pRuntimeEnv, pStatus, &win, offset);
|
||||
|
||||
|
@ -1350,10 +1234,8 @@ void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void* inputData, TSKEY
|
|||
|
||||
if (tpField != NULL) {
|
||||
pCtx->preAggVals.isSet = true;
|
||||
pCtx->preAggVals.statis = *pStatis;
|
||||
if (pCtx->preAggVals.statis.numOfNull == -1) {
|
||||
pCtx->preAggVals.statis.numOfNull = pBlockInfo->rows; // todo :can not be -1
|
||||
}
|
||||
pCtx->preAggVals.statis = *tpField;
|
||||
assert(pCtx->preAggVals.statis.numOfNull <= pBlockInfo->rows);
|
||||
} else {
|
||||
pCtx->preAggVals.isSet = false;
|
||||
}
|
||||
|
@ -1747,54 +1629,6 @@ void getAlignQueryTimeWindow(SQuery *pQuery, int64_t key, int64_t keyFirst, int6
|
|||
}
|
||||
}
|
||||
|
||||
static UNUSED_FUNC bool doGetQueryPos(TSKEY key, SQInfo *pQInfo, SPointInterpoSupporter *pPointInterpSupporter) {
|
||||
#if 0
|
||||
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
|
||||
SQuery * pQuery = pRuntimeEnv->pQuery;
|
||||
SMeterObj * pMeterObj = pRuntimeEnv->pTabObj;
|
||||
|
||||
/* key in query range. If not, no qualified in disk file */
|
||||
if (key != -1 && key <= pQuery->window.ekey) {
|
||||
if (isPointInterpoQuery(pQuery)) { /* no qualified data in this query range */
|
||||
return getNeighborPoints(pQInfo, pMeterObj, pPointInterpSupporter);
|
||||
} else {
|
||||
return true;
|
||||
}
|
||||
} else { // key > pQuery->window.ekey, abort for normal query, continue for interp query
|
||||
if (isPointInterpoQuery(pQuery)) {
|
||||
return getNeighborPoints(pQInfo, pMeterObj, pPointInterpSupporter);
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
#endif
|
||||
return true;
|
||||
}
|
||||
|
||||
static UNUSED_FUNC bool doSetDataInfo(SQInfo *pQInfo, SPointInterpoSupporter *pPointInterpSupporter, void *pMeterObj,
|
||||
TSKEY nextKey) {
|
||||
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
|
||||
SQuery * pQuery = pRuntimeEnv->pQuery;
|
||||
|
||||
if (isFirstLastRowQuery(pQuery)) {
|
||||
/*
|
||||
* if the pQuery->window.skey != pQuery->window.ekey for last_row query,
|
||||
* the query range is existed, so set them both the value of nextKey
|
||||
*/
|
||||
if (pQuery->window.skey != pQuery->window.ekey) {
|
||||
assert(pQuery->window.skey >= pQuery->window.ekey && !QUERY_IS_ASC_QUERY(pQuery) &&
|
||||
nextKey >= pQuery->window.ekey && nextKey <= pQuery->window.skey);
|
||||
|
||||
pQuery->window.skey = nextKey;
|
||||
pQuery->window.ekey = nextKey;
|
||||
}
|
||||
|
||||
return getNeighborPoints(pQInfo, pMeterObj, pPointInterpSupporter);
|
||||
} else {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
static void setScanLimitationByResultBuffer(SQuery *pQuery) {
|
||||
if (isTopBottomQuery(pQuery)) {
|
||||
pQuery->checkBuffer = 0;
|
||||
|
@ -1990,159 +1824,6 @@ static UNUSED_FUNC void doSetInterpVal(SQLFunctionCtx *pCtx, TSKEY ts, int16_t t
|
|||
pCtx->param[index].nLen = len;
|
||||
}
|
||||
|
||||
/**
|
||||
* param[1]: default value/previous value of specified timestamp
|
||||
* param[2]: next value of specified timestamp
|
||||
* param[3]: denotes if the result is a precious result or interpolation results
|
||||
*
|
||||
* @param pQInfo
|
||||
* @param pQInfo
|
||||
* @param pInterpoRaw
|
||||
*/
|
||||
void pointInterpSupporterSetData(SQInfo *pQInfo, SPointInterpoSupporter *pPointInterpSupport) {
|
||||
#if 0
|
||||
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
|
||||
SQuery * pQuery = pRuntimeEnv->pQuery;
|
||||
|
||||
// not point interpolation query, abort
|
||||
if (!isPointInterpoQuery(pQuery)) {
|
||||
return;
|
||||
}
|
||||
|
||||
int32_t count = 1;
|
||||
TSKEY key = *(TSKEY *)pPointInterpSupport->next[0];
|
||||
|
||||
if (key == pQuery->window.skey) {
|
||||
// the queried timestamp has value, return it directly without interpolation
|
||||
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
|
||||
tVariantCreateFromBinary(&pRuntimeEnv->pCtx[i].param[3], (char *)&count, sizeof(count), TSDB_DATA_TYPE_INT);
|
||||
|
||||
pRuntimeEnv->pCtx[i].param[0].i64Key = key;
|
||||
pRuntimeEnv->pCtx[i].param[0].nType = TSDB_DATA_TYPE_BIGINT;
|
||||
}
|
||||
} else {
|
||||
// set the direct previous(next) point for process
|
||||
count = 2;
|
||||
|
||||
if (pQuery->fillType == TSDB_FILL_SET_VALUE) {
|
||||
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
|
||||
SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[i];
|
||||
|
||||
// only the function of interp needs the corresponding information
|
||||
if (pCtx->functionId != TSDB_FUNC_INTERP) {
|
||||
continue;
|
||||
}
|
||||
|
||||
pCtx->numOfParams = 4;
|
||||
|
||||
SInterpInfo *pInterpInfo = (SInterpInfo *)pRuntimeEnv->pCtx[i].aOutputBuf;
|
||||
pInterpInfo->pInterpDetail = calloc(1, sizeof(SInterpInfoDetail));
|
||||
|
||||
SInterpInfoDetail *pInterpDetail = pInterpInfo->pInterpDetail;
|
||||
|
||||
// for primary timestamp column, set the flag
|
||||
if (pQuery->pSelectExpr[i].base.colInfo.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
|
||||
pInterpDetail->primaryCol = 1;
|
||||
}
|
||||
|
||||
tVariantCreateFromBinary(&pCtx->param[3], (char *)&count, sizeof(count), TSDB_DATA_TYPE_INT);
|
||||
|
||||
if (isNull((char *)&pQuery->fillVal[i], pCtx->inputType)) {
|
||||
pCtx->param[1].nType = TSDB_DATA_TYPE_NULL;
|
||||
} else {
|
||||
tVariantCreateFromBinary(&pCtx->param[1], (char *)&pQuery->fillVal[i], pCtx->inputBytes, pCtx->inputType);
|
||||
}
|
||||
|
||||
pInterpDetail->ts = pQuery->window.skey;
|
||||
pInterpDetail->type = pQuery->fillType;
|
||||
}
|
||||
} else {
|
||||
TSKEY prevKey = *(TSKEY *)pPointInterpSupport->pPrevPoint[0];
|
||||
TSKEY nextKey = *(TSKEY *)pPointInterpSupport->pNextPoint[0];
|
||||
|
||||
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
|
||||
SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[i];
|
||||
|
||||
// tag column does not need the interp environment
|
||||
if (pQuery->pSelectExpr[i].base.functionId == TSDB_FUNC_TAG) {
|
||||
continue;
|
||||
}
|
||||
|
||||
int32_t colInBuf = 0; // pQuery->pSelectExpr[i].base.colInfo.colIdxInBuf;
|
||||
SInterpInfo *pInterpInfo = (SInterpInfo *)pRuntimeEnv->pCtx[i].aOutputBuf;
|
||||
|
||||
pInterpInfo->pInterpDetail = calloc(1, sizeof(SInterpInfoDetail));
|
||||
SInterpInfoDetail *pInterpDetail = pInterpInfo->pInterpDetail;
|
||||
|
||||
// int32_t type = GET_COLUMN_TYPE(pQuery, i);
|
||||
int32_t type = 0;
|
||||
assert(0);
|
||||
|
||||
// for primary timestamp column, set the flag
|
||||
if (pQuery->pSelectExpr[i].base.colInfo.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
|
||||
pInterpDetail->primaryCol = 1;
|
||||
} else {
|
||||
doSetInterpVal(pCtx, prevKey, type, 1, pPointInterpSupport->pPrevPoint[colInBuf]);
|
||||
doSetInterpVal(pCtx, nextKey, type, 2, pPointInterpSupport->pNextPoint[colInBuf]);
|
||||
}
|
||||
|
||||
tVariantCreateFromBinary(&pRuntimeEnv->pCtx[i].param[3], (char *)&count, sizeof(count), TSDB_DATA_TYPE_INT);
|
||||
|
||||
pInterpDetail->ts = pQInfo->runtimeEnv.pQuery->window.skey;
|
||||
pInterpDetail->type = pQuery->fillType;
|
||||
}
|
||||
}
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
void pointInterpSupporterInit(SQuery *pQuery, SPointInterpoSupporter *pInterpoSupport) {
|
||||
#if 0
|
||||
if (isPointInterpoQuery(pQuery)) {
|
||||
pInterpoSupport->pPrevPoint = malloc(pQuery->numOfCols * POINTER_BYTES);
|
||||
pInterpoSupport->pNextPoint = malloc(pQuery->numOfCols * POINTER_BYTES);
|
||||
|
||||
pInterpoSupport->numOfCols = pQuery->numOfCols;
|
||||
|
||||
/* get appropriated size for one row data source*/
|
||||
int32_t len = 0;
|
||||
for (int32_t i = 0; i < pQuery->numOfCols; ++i) {
|
||||
len += pQuery->colList[i].bytes;
|
||||
}
|
||||
|
||||
// assert(PRIMARY_TSCOL_LOADED(pQuery));
|
||||
|
||||
void *prev = calloc(1, len);
|
||||
void *next = calloc(1, len);
|
||||
|
||||
int32_t offset = 0;
|
||||
|
||||
for (int32_t i = 0; i < pQuery->numOfCols; ++i) {
|
||||
pInterpoSupport->pPrevPoint[i] = prev + offset;
|
||||
pInterpoSupport->pNextPoint[i] = next + offset;
|
||||
|
||||
offset += pQuery->colList[i].bytes;
|
||||
}
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
void pointInterpSupporterDestroy(SPointInterpoSupporter *pPointInterpSupport) {
|
||||
#if 0
|
||||
if (pPointInterpSupport->numOfCols <= 0 || pPointInterpSupport->pPrevPoint == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
tfree(pPointInterpSupport->pPrevPoint[0]);
|
||||
tfree(pPointInterpSupport->pNextPoint[0]);
|
||||
|
||||
tfree(pPointInterpSupport->pPrevPoint);
|
||||
tfree(pPointInterpSupport->pNextPoint);
|
||||
|
||||
pPointInterpSupport->numOfCols = 0;
|
||||
#endif
|
||||
}
|
||||
|
||||
static int32_t getInitialPageNum(SQInfo *pQInfo) {
|
||||
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
|
||||
int32_t INITIAL_RESULT_ROWS_VALUE = 16;
|
||||
|
@ -4331,9 +4012,6 @@ int32_t doInitQInfo(SQInfo *pQInfo, void *param, void *tsdb, int32_t vgId, bool
|
|||
|
||||
setQueryStatus(pQuery, QUERY_NOT_COMPLETED);
|
||||
|
||||
// SPointInterpoSupporter interpInfo = {0};
|
||||
// pointInterpSupporterInit(pQuery, &interpInfo);
|
||||
|
||||
/*
|
||||
* in case of last_row query without query range, we set the query timestamp to be
|
||||
* STable->lastKey. Otherwise, keep the initial query time range unchanged.
|
||||
|
@ -4346,13 +4024,6 @@ int32_t doInitQInfo(SQInfo *pQInfo, void *param, void *tsdb, int32_t vgId, bool
|
|||
// }
|
||||
// }
|
||||
|
||||
/*
|
||||
* here we set the value for before and after the specified time into the
|
||||
* parameter for interpolation query
|
||||
*/
|
||||
// pointInterpSupporterSetData(pQInfo, &interpInfo);
|
||||
// pointInterpSupporterDestroy(&interpInfo);
|
||||
|
||||
if (pQuery->fillType != TSDB_FILL_NONE && !isPointInterpoQuery(pQuery)) {
|
||||
SFillColInfo* pColInfo = taosCreateFillColInfo(pQuery);
|
||||
pRuntimeEnv->pFillInfo = taosInitFillInfo(pQuery->order.order, 0, 0, pQuery->rec.capacity, pQuery->numOfOutput,
|
||||
|
@ -4429,6 +4100,7 @@ static int64_t queryOnDataBlocks(SQInfo *pQInfo) {
|
|||
|
||||
assert(pTableQueryInfo != NULL);
|
||||
restoreIntervalQueryRange(pRuntimeEnv, pTableQueryInfo);
|
||||
printf("table:%d, groupIndex:%d, rows:%d\n", pTableQueryInfo->id.tid, pTableQueryInfo->groupIdx, blockInfo.tid);
|
||||
|
||||
SDataStatis *pStatis = NULL;
|
||||
|
||||
|
@ -6340,7 +6012,7 @@ static void buildTagQueryResult(SQInfo* pQInfo) {
|
|||
memcpy(dst, data, varDataTLen(data));
|
||||
} else {// todo refactor, return the true length of binary|nchar data
|
||||
tsdbGetTableTagVal(pQInfo->tsdb, &item->id, pExprInfo[j].base.colInfo.colId, &type, &bytes, &data);
|
||||
assert(bytes == pExprInfo[j].bytes && type == pExprInfo[j].type);
|
||||
assert(bytes <= pExprInfo[j].bytes && type == pExprInfo[j].type);
|
||||
|
||||
char* dst = pQuery->sdata[j]->data + i * bytes;
|
||||
if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
|
||||
|
|
|
@ -209,7 +209,7 @@ bool like_str(SColumnFilterElem *pFilter, char *minval, char *maxval) {
|
|||
bool like_nchar(SColumnFilterElem* pFilter, char* minval, char *maxval) {
|
||||
SPatternCompareInfo info = PATTERN_COMPARE_INFO_INITIALIZER;
|
||||
|
||||
return WCSPatternMatch((wchar_t*) pFilter->filterInfo.pz, varDataVal(minval), varDataLen(minval)/TSDB_NCHAR_SIZE, &info) == TSDB_PATTERN_MATCH;
|
||||
return WCSPatternMatch((wchar_t*)pFilter->filterInfo.pz, varDataVal(minval), varDataLen(minval)/TSDB_NCHAR_SIZE, &info) == TSDB_PATTERN_MATCH;
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////
|
||||
|
|
|
@ -137,11 +137,10 @@ void clearFirstNTimeWindow(SQueryRuntimeEnv *pRuntimeEnv, int32_t num) {
|
|||
for (int32_t k = 0; k < pWindowResInfo->size; ++k) {
|
||||
SWindowResult *pResult = &pWindowResInfo->pResult[k];
|
||||
int32_t *p = (int32_t *)taosHashGet(pWindowResInfo->hashList, (const char *)&pResult->window.skey, TSDB_KEYSIZE);
|
||||
|
||||
int32_t v = (*p - num);
|
||||
assert(v >= 0 && v <= pWindowResInfo->size);
|
||||
|
||||
taosHashPut(pWindowResInfo->hashList, (const char *)&pResult->window.skey, TSDB_KEYSIZE, (char *)&v,
|
||||
sizeof(int32_t));
|
||||
taosHashPut(pWindowResInfo->hashList, (char *)&pResult->window.skey, TSDB_KEYSIZE, (char *)&v, sizeof(int32_t));
|
||||
}
|
||||
|
||||
pWindowResInfo->curIndex = -1;
|
||||
|
|
|
@ -363,8 +363,6 @@ static int32_t toBinary(tVariant *pVariant, char **pDest, int32_t *pDestSize) {
|
|||
|
||||
taosUcs4ToMbs(pVariant->wpz, newSize, pBuf);
|
||||
free(pVariant->wpz);
|
||||
|
||||
/* terminated string */
|
||||
pBuf[newSize] = 0;
|
||||
} else {
|
||||
taosUcs4ToMbs(pVariant->wpz, newSize, *pDest);
|
||||
|
@ -598,7 +596,7 @@ static int32_t convertToBool(tVariant *pVariant, int64_t *pDest) {
|
|||
*
|
||||
* todo handle the return value
|
||||
*/
|
||||
int32_t tVariantDump(tVariant *pVariant, char *payload, char type) {
|
||||
int32_t tVariantDump(tVariant *pVariant, char *payload, char type, bool includeLengthPrefix) {
|
||||
if (pVariant == NULL || (pVariant->nType != 0 && !isValidDataType(pVariant->nType, pVariant->nLen))) {
|
||||
return -1;
|
||||
}
|
||||
|
@ -765,13 +763,30 @@ int32_t tVariantDump(tVariant *pVariant, char *payload, char type) {
|
|||
}
|
||||
|
||||
case TSDB_DATA_TYPE_BINARY: {
|
||||
if (pVariant->nType == TSDB_DATA_TYPE_NULL) {
|
||||
setVardataNull(payload,TSDB_DATA_TYPE_BINARY);
|
||||
} else {
|
||||
if (pVariant->nType != TSDB_DATA_TYPE_BINARY) {
|
||||
toBinary(pVariant, &payload, &pVariant->nLen);
|
||||
if (!includeLengthPrefix) {
|
||||
if (pVariant->nType == TSDB_DATA_TYPE_NULL) {
|
||||
*(uint8_t*) payload = TSDB_DATA_BINARY_NULL;
|
||||
} else {
|
||||
strncpy(payload, pVariant->pz, pVariant->nLen);
|
||||
if (pVariant->nType != TSDB_DATA_TYPE_BINARY) {
|
||||
toBinary(pVariant, &payload, &pVariant->nLen);
|
||||
} else {
|
||||
strncpy(payload, pVariant->pz, pVariant->nLen);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
if (pVariant->nType == TSDB_DATA_TYPE_NULL) {
|
||||
setVardataNull(payload, TSDB_DATA_TYPE_BINARY);
|
||||
} else {
|
||||
char *p = varDataVal(payload);
|
||||
|
||||
if (pVariant->nType != TSDB_DATA_TYPE_BINARY) {
|
||||
toBinary(pVariant, &p, &pVariant->nLen);
|
||||
} else {
|
||||
strncpy(p, pVariant->pz, pVariant->nLen);
|
||||
}
|
||||
|
||||
varDataSetLen(payload, pVariant->nLen);
|
||||
assert(p == varDataVal(payload));
|
||||
}
|
||||
}
|
||||
break;
|
||||
|
@ -785,15 +800,33 @@ int32_t tVariantDump(tVariant *pVariant, char *payload, char type) {
|
|||
break;
|
||||
}
|
||||
case TSDB_DATA_TYPE_NCHAR: {
|
||||
if (pVariant->nType == TSDB_DATA_TYPE_NULL) {
|
||||
setVardataNull(payload,TSDB_DATA_TYPE_NCHAR);
|
||||
} else {
|
||||
if (pVariant->nType != TSDB_DATA_TYPE_NCHAR) {
|
||||
toNchar(pVariant, &payload, &pVariant->nLen);
|
||||
if (!includeLengthPrefix) {
|
||||
if (pVariant->nType == TSDB_DATA_TYPE_NULL) {
|
||||
*(uint32_t *)payload = TSDB_DATA_NCHAR_NULL;
|
||||
} else {
|
||||
wcsncpy((wchar_t *)payload, pVariant->wpz, pVariant->nLen);
|
||||
if (pVariant->nType != TSDB_DATA_TYPE_NCHAR) {
|
||||
toNchar(pVariant, &payload, &pVariant->nLen);
|
||||
} else {
|
||||
wcsncpy((wchar_t *)payload, pVariant->wpz, pVariant->nLen);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
if (pVariant->nType == TSDB_DATA_TYPE_NULL) {
|
||||
setVardataNull(payload, TSDB_DATA_TYPE_NCHAR);
|
||||
} else {
|
||||
char *p = varDataVal(payload);
|
||||
|
||||
if (pVariant->nType != TSDB_DATA_TYPE_NCHAR) {
|
||||
toNchar(pVariant, &p, &pVariant->nLen);
|
||||
} else {
|
||||
wcsncpy((wchar_t *)p, pVariant->wpz, pVariant->nLen);
|
||||
}
|
||||
|
||||
varDataSetLen(payload, pVariant->nLen); // the length may be changed after toNchar function called
|
||||
assert(p == varDataVal(payload));
|
||||
}
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -119,7 +119,7 @@ static char* getTagIndexKey(const void* pData) {
|
|||
STSchema* pSchema = tsdbGetTableTagSchema(elem->pMeta, elem->pTable);
|
||||
STColumn* pCol = &pSchema->columns[DEFAULT_TAG_INDEX_COLUMN];
|
||||
int16_t type = 0;
|
||||
void * res = tdQueryTagByID(row, pCol->colId,&type);
|
||||
void * res = tdQueryTagByID(row, pCol->colId, &type);
|
||||
ASSERT(type == pCol->type);
|
||||
return res;
|
||||
}
|
||||
|
@ -244,30 +244,18 @@ int32_t tsdbGetTableTagVal(TsdbRepoT* repo, STableId* id, int32_t colId, int16_t
|
|||
STsdbMeta* pMeta = tsdbGetMeta(repo);
|
||||
STable* pTable = tsdbGetTableByUid(pMeta, id->uid);
|
||||
|
||||
STSchema* pSchema = tsdbGetTableTagSchema(pMeta, pTable);
|
||||
STColumn* pCol = NULL;
|
||||
*val = tdQueryTagByID(pTable->tagVal, colId, type);
|
||||
|
||||
// todo binary search
|
||||
for(int32_t col = 0; col < schemaNCols(pSchema); ++col) {
|
||||
STColumn* p = schemaColAt(pSchema, col);
|
||||
if (p->colId == colId) {
|
||||
pCol = p;
|
||||
break;
|
||||
if (*val != NULL) {
|
||||
switch(*type) {
|
||||
case TSDB_DATA_TYPE_BINARY:
|
||||
case TSDB_DATA_TYPE_NCHAR: *bytes = varDataLen(*val); break;
|
||||
case TSDB_DATA_TYPE_NULL: *bytes = 0; break;
|
||||
default:
|
||||
*bytes = tDataTypeDesc[*type].nSize;break;
|
||||
}
|
||||
}
|
||||
|
||||
if (pCol == NULL) {
|
||||
return -1; // No matched tags. Maybe the modification of tags has not been done yet.
|
||||
}
|
||||
|
||||
SDataRow row = (SDataRow)pTable->tagVal;
|
||||
int16_t tagtype = 0;
|
||||
char* d = tdQueryTagByID(row, pCol->colId, &tagtype);
|
||||
//ASSERT((int8_t)tagtype == pCol->type)
|
||||
*val = d;
|
||||
*type = pCol->type;
|
||||
*bytes = pCol->bytes;
|
||||
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
|
|
@ -95,7 +95,6 @@ typedef struct STsdbQueryHandle {
|
|||
SQueryFilePos cur; // current position
|
||||
int16_t order;
|
||||
STimeWindow window; // the primary query time window that applies to all queries
|
||||
SCompBlock* pBlock;
|
||||
SDataStatis* statis; // query level statistics, only one table block statistics info exists at any time
|
||||
int32_t numOfBlocks;
|
||||
SArray* pColumns; // column list, SColumnInfoData array list
|
||||
|
@ -118,6 +117,11 @@ typedef struct STsdbQueryHandle {
|
|||
|
||||
static void changeQueryHandleForLastrowQuery(TsdbQueryHandleT pqHandle);
|
||||
static void changeQueryHandleForInterpQuery(TsdbQueryHandleT pHandle);
|
||||
static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SCompBlock* pBlock,
|
||||
SArray* sa);
|
||||
static int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order);
|
||||
static int tsdbReadRowsFromCache(SSkipListIterator* pIter, STable* pTable, TSKEY maxKey, int maxRowsToRead, TSKEY* skey, TSKEY* ekey,
|
||||
STsdbQueryHandle* pQueryHandle);
|
||||
|
||||
static void tsdbInitDataBlockLoadInfo(SDataBlockLoadInfo* pBlockLoadInfo) {
|
||||
pBlockLoadInfo->slot = -1;
|
||||
|
@ -326,7 +330,21 @@ static bool hasMoreDataInCache(STsdbQueryHandle* pHandle) {
|
|||
(pCheckInfo->lastKey < pHandle->window.ekey && !ASCENDING_TRAVERSE(pHandle->order))) {
|
||||
return false;
|
||||
}
|
||||
|
||||
int32_t step = ASCENDING_TRAVERSE(pHandle->order)? 1:-1;
|
||||
STimeWindow* win = &pHandle->cur.win;
|
||||
pHandle->cur.rows = tsdbReadRowsFromCache(pCheckInfo->iter, pCheckInfo->pTableObj, pHandle->window.ekey,
|
||||
pHandle->outputCapacity, &win->skey, &win->ekey, pHandle); // todo refactor API
|
||||
|
||||
// update the last key value
|
||||
pCheckInfo->lastKey = win->ekey + step;
|
||||
pHandle->cur.lastKey = win->ekey + step;
|
||||
pHandle->cur.mixBlock = true;
|
||||
|
||||
if (!ASCENDING_TRAVERSE(pHandle->order)) {
|
||||
SWAP(win->skey, win->ekey, TSKEY);
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -470,12 +488,6 @@ static SArray* getDefaultLoadColumns(STsdbQueryHandle* pQueryHandle, bool loadTS
|
|||
return pLocalIdList;
|
||||
}
|
||||
|
||||
static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SCompBlock* pBlock,
|
||||
SArray* sa);
|
||||
static int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order);
|
||||
static int tsdbReadRowsFromCache(SSkipListIterator* pIter, STable* pTable, TSKEY maxKey, int maxRowsToRead, TSKEY* skey, TSKEY* ekey,
|
||||
STsdbQueryHandle* pQueryHandle);
|
||||
|
||||
static bool doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock, STableCheckInfo* pCheckInfo) {
|
||||
STsdbRepo *pRepo = pQueryHandle->pTsdb;
|
||||
SCompData* data = calloc(1, sizeof(SCompData) + sizeof(SCompCol) * pBlock->numOfCols);
|
||||
|
@ -591,21 +603,14 @@ static void handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SCompBlock*
|
|||
*
|
||||
* Here the buffer is not enough, so only part of file block can be loaded into memory buffer
|
||||
*/
|
||||
// if (pQueryHandle->outputCapacity < binfo.rows) {
|
||||
// SArray* sa = getDefaultLoadColumns(pQueryHandle, true);
|
||||
// doLoadFileDataBlock(pQueryHandle, pBlock, pCheckInfo);
|
||||
//
|
||||
// doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock, sa);
|
||||
// taosArrayDestroy(sa);
|
||||
// } else {
|
||||
pQueryHandle->realNumOfRows = binfo.rows;
|
||||
|
||||
cur->rows = binfo.rows;
|
||||
cur->win = binfo.window;
|
||||
cur->mixBlock = false;
|
||||
cur->blockCompleted = true;
|
||||
cur->lastKey = binfo.window.ekey + (ASCENDING_TRAVERSE(pQueryHandle->order)? 1:-1);
|
||||
// }
|
||||
assert(pQueryHandle->outputCapacity >= binfo.rows);
|
||||
pQueryHandle->realNumOfRows = binfo.rows;
|
||||
|
||||
cur->rows = binfo.rows;
|
||||
cur->win = binfo.window;
|
||||
cur->mixBlock = false;
|
||||
cur->blockCompleted = true;
|
||||
cur->lastKey = binfo.window.ekey + (ASCENDING_TRAVERSE(pQueryHandle->order)? 1:-1);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1305,7 +1310,7 @@ static bool getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle) {
|
|||
|
||||
static bool doHasDataInBuffer(STsdbQueryHandle* pQueryHandle) {
|
||||
size_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
|
||||
// todo add assert, the value of numOfTables should be less than the maximum value for each vnode capacity
|
||||
assert(numOfTables <= ((STsdbRepo*)pQueryHandle->pTsdb)->config.maxTables);
|
||||
|
||||
while (pQueryHandle->activeIndex < numOfTables) {
|
||||
if (hasMoreDataInCache(pQueryHandle)) {
|
||||
|
@ -1615,56 +1620,29 @@ static int tsdbReadRowsFromCache(SSkipListIterator* pIter, STable* pTable, TSKEY
|
|||
return numOfRows;
|
||||
}
|
||||
|
||||
// copy data from cache into data block
|
||||
SDataBlockInfo tsdbRetrieveDataBlockInfo(TsdbQueryHandleT* pQueryHandle) {
|
||||
STsdbQueryHandle* pHandle = (STsdbQueryHandle*)pQueryHandle;
|
||||
STableCheckInfo* pCheckInfo = taosArrayGet(pHandle->pTableCheckInfo, pHandle->activeIndex);
|
||||
SQueryFilePos* cur = &pHandle->cur;
|
||||
|
||||
STable* pTable = pCheckInfo->pTableObj;
|
||||
int32_t step = ASCENDING_TRAVERSE(pHandle->order)? 1:-1;
|
||||
STable* pTable = NULL;
|
||||
|
||||
// there are data in file
|
||||
if (pHandle->cur.fid >= 0) {
|
||||
|
||||
SDataBlockInfo blockInfo = {
|
||||
.uid = pTable->tableId.uid,
|
||||
.tid = pTable->tableId.tid,
|
||||
.rows = pHandle->cur.rows,
|
||||
.window = pHandle->cur.win,
|
||||
.numOfCols = QH_GET_NUM_OF_COLS(pHandle),
|
||||
};
|
||||
|
||||
return blockInfo;
|
||||
STableBlockInfo* pBlockInfo = &pHandle->pDataBlockInfo[cur->slot];
|
||||
pTable = pBlockInfo->pTableCheckInfo->pTableObj;
|
||||
} else {
|
||||
// TODO move to next function
|
||||
if (pTable->mem != NULL && pHandle->type != TSDB_QUERY_TYPE_EXTERNAL) { // create mem table iterator if it is not created yet
|
||||
assert(pCheckInfo->iter != NULL);
|
||||
STimeWindow* win = &cur->win;
|
||||
|
||||
cur->rows = tsdbReadRowsFromCache(pCheckInfo->iter, pCheckInfo->pTableObj, pHandle->window.ekey,
|
||||
pHandle->outputCapacity, &win->skey, &win->ekey, pHandle); // todo refactor API
|
||||
|
||||
// update the last key value
|
||||
pCheckInfo->lastKey = win->ekey + step;
|
||||
cur->lastKey = win->ekey + step;
|
||||
cur->mixBlock = true;
|
||||
}
|
||||
|
||||
if (!ASCENDING_TRAVERSE(pHandle->order)) {
|
||||
SWAP(cur->win.skey, cur->win.ekey, TSKEY);
|
||||
}
|
||||
|
||||
SDataBlockInfo blockInfo = {
|
||||
.uid = pTable->tableId.uid,
|
||||
.tid = pTable->tableId.tid,
|
||||
.rows = pHandle->cur.rows,
|
||||
.window = pHandle->cur.win,
|
||||
.numOfCols = QH_GET_NUM_OF_COLS(pHandle),
|
||||
};
|
||||
|
||||
return blockInfo;
|
||||
STableCheckInfo* pCheckInfo = taosArrayGet(pHandle->pTableCheckInfo, pHandle->activeIndex);
|
||||
pTable = pCheckInfo->pTableObj;
|
||||
}
|
||||
|
||||
SDataBlockInfo blockInfo = {
|
||||
.uid = pTable->tableId.uid,
|
||||
.tid = pTable->tableId.tid,
|
||||
.rows = cur->rows,
|
||||
.window = cur->win,
|
||||
.numOfCols = QH_GET_NUM_OF_COLS(pHandle),
|
||||
};
|
||||
|
||||
return blockInfo;
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -1855,12 +1833,7 @@ void filterPrepare(void* expr, void* param) {
|
|||
pInfo->q = (char*) pCond->arr;
|
||||
} else {
|
||||
pInfo->q = calloc(1, pSchema->bytes);
|
||||
if (pSchema->type == TSDB_DATA_TYPE_BINARY || pSchema->type == TSDB_DATA_TYPE_NCHAR) {
|
||||
tVariantDump(pCond, varDataVal(pInfo->q), pSchema->type);
|
||||
varDataSetLen(pInfo->q, pCond->nLen); // the length may be changed after dump, so assign its value after dump
|
||||
} else {
|
||||
tVariantDump(pCond, pInfo->q, pSchema->type);
|
||||
}
|
||||
tVariantDump(pCond, pInfo->q, pSchema->type, true);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1990,13 +1963,11 @@ bool indexedNodeFilterFp(const void* pNode, void* param) {
|
|||
val = (char*) elem->pTable->name;
|
||||
type = TSDB_DATA_TYPE_BINARY;
|
||||
} else {
|
||||
// STSchema* pTSchema = (STSchema*) pInfo->param; // todo table schema is identical to stable schema??
|
||||
int16_t type;
|
||||
// int32_t offset = pTSchema->columns[pInfo->colIndex].offset;
|
||||
// val = tdGetRowDataOfCol(elem->pTable->tagVal, pInfo->sch.type, TD_DATA_ROW_HEAD_SIZE + offset);
|
||||
val = tdQueryTagByID(elem->pTable->tagVal, pInfo->sch.colId, &type);
|
||||
// ASSERT(pInfo->sch.type == type);
|
||||
int16_t t1;
|
||||
val = tdQueryTagByID(elem->pTable->tagVal, pInfo->sch.colId, &t1);
|
||||
assert(pInfo->sch.type == t1);
|
||||
}
|
||||
|
||||
//todo :the val is possible to be null, so check it out carefully
|
||||
int32_t ret = 0;
|
||||
if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
|
||||
|
|
|
@ -30,24 +30,19 @@ typedef void (*_hash_free_fn_t)(void *param);
|
|||
|
||||
typedef struct SHashNode {
|
||||
char *key;
|
||||
union {
|
||||
// union {
|
||||
struct SHashNode * prev;
|
||||
struct SHashEntry *prev1;
|
||||
};
|
||||
|
||||
// struct SHashEntry *prev1;
|
||||
// };
|
||||
//
|
||||
struct SHashNode *next;
|
||||
uint32_t hashVal; // the hash value of key, if hashVal == HASH_VALUE_IN_TRASH, this node is moved to trash
|
||||
uint32_t keyLen; // length of the key
|
||||
char data[];
|
||||
} SHashNode;
|
||||
|
||||
typedef struct SHashEntry {
|
||||
SHashNode *next;
|
||||
uint32_t num;
|
||||
} SHashEntry;
|
||||
|
||||
typedef struct SHashObj {
|
||||
SHashEntry ** hashList;
|
||||
SHashNode **hashList;
|
||||
size_t capacity; // number of slots
|
||||
size_t size; // number of elements in hash table
|
||||
_hash_fn_t hashFp; // hash function
|
||||
|
|
|
@ -83,17 +83,10 @@ static FORCE_INLINE int32_t taosHashCapacity(int32_t length) {
|
|||
int32_t len = MIN(length, HASH_MAX_CAPACITY);
|
||||
|
||||
uint32_t i = 4;
|
||||
while (i < len) i = (i << 1U);
|
||||
while (i < len) i = (i << 1u);
|
||||
return i;
|
||||
}
|
||||
|
||||
/**
|
||||
* inplace update node in hash table
|
||||
* @param pHashObj hash table object
|
||||
* @param pNode hash data node
|
||||
*/
|
||||
static void doUpdateHashTable(SHashObj *pHashObj, SHashNode *pNode);
|
||||
|
||||
/**
|
||||
* Get SHashNode from hashlist, nodes from trash are not included.
|
||||
* @param pHashObj Cache objection
|
||||
|
@ -105,10 +98,9 @@ static void doUpdateHashTable(SHashObj *pHashObj, SHashNode *pNode);
|
|||
FORCE_INLINE SHashNode *doGetNodeFromHashTable(SHashObj *pHashObj, const void *key, uint32_t keyLen, uint32_t *hashVal) {
|
||||
uint32_t hash = (*pHashObj->hashFp)(key, keyLen);
|
||||
|
||||
int32_t slot = HASH_INDEX(hash, pHashObj->capacity);
|
||||
SHashEntry *pEntry = pHashObj->hashList[slot];
|
||||
int32_t slot = HASH_INDEX(hash, pHashObj->capacity);
|
||||
SHashNode *pNode = pHashObj->hashList[slot];
|
||||
|
||||
SHashNode *pNode = pEntry->next;
|
||||
while (pNode) {
|
||||
if ((pNode->keyLen == keyLen) && (memcmp(pNode->key, key, keyLen) == 0)) {
|
||||
break;
|
||||
|
@ -190,17 +182,13 @@ SHashObj *taosHashInit(size_t capacity, _hash_fn_t fn, bool threadsafe) {
|
|||
|
||||
pHashObj->hashFp = fn;
|
||||
|
||||
pHashObj->hashList = (SHashEntry **)calloc(pHashObj->capacity, sizeof(SHashEntry *));
|
||||
pHashObj->hashList = (SHashNode **)calloc(pHashObj->capacity, POINTER_BYTES);
|
||||
if (pHashObj->hashList == NULL) {
|
||||
free(pHashObj);
|
||||
uError("failed to allocate memory, reason:%s", strerror(errno));
|
||||
return NULL;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < pHashObj->capacity; ++i) {
|
||||
pHashObj->hashList[i] = calloc(1, sizeof(SHashEntry));
|
||||
}
|
||||
|
||||
if (threadsafe) {
|
||||
#if defined(LINUX)
|
||||
pHashObj->lock = calloc(1, sizeof(pthread_rwlock_t));
|
||||
|
@ -252,7 +240,18 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *da
|
|||
return -1;
|
||||
}
|
||||
|
||||
doUpdateHashTable(pHashObj, pNewNode);
|
||||
if (pNewNode->prev) {
|
||||
pNewNode->prev->next = pNewNode;
|
||||
} else {
|
||||
int32_t slot = HASH_INDEX(pNewNode->hashVal, pHashObj->capacity);
|
||||
|
||||
assert(pHashObj->hashList[slot] == pNode);
|
||||
pHashObj->hashList[slot] = pNewNode;
|
||||
}
|
||||
|
||||
if (pNewNode->next) {
|
||||
(pNewNode->next)->prev = pNewNode;
|
||||
}
|
||||
}
|
||||
|
||||
__unlock(pHashObj->lock);
|
||||
|
@ -287,24 +286,19 @@ void taosHashRemove(SHashObj *pHashObj, const void *key, size_t keyLen) {
|
|||
}
|
||||
|
||||
SHashNode *pNext = pNode->next;
|
||||
if (pNode->prev != NULL) {
|
||||
if (pNode->prev == NULL) {
|
||||
int32_t slot = HASH_INDEX(val, pHashObj->capacity);
|
||||
if (pHashObj->hashList[slot]->next == pNode) {
|
||||
pHashObj->hashList[slot]->next = pNext;
|
||||
} else {
|
||||
pNode->prev->next = pNext;
|
||||
}
|
||||
assert(pHashObj->hashList[slot] == pNode);
|
||||
|
||||
pHashObj->hashList[slot] = pNext;
|
||||
} else {
|
||||
pNode->prev->next = pNext;
|
||||
}
|
||||
|
||||
|
||||
if (pNext != NULL) {
|
||||
pNext->prev = pNode->prev;
|
||||
}
|
||||
|
||||
uint32_t index = HASH_INDEX(pNode->hashVal, pHashObj->capacity);
|
||||
|
||||
SHashEntry *pEntry = pHashObj->hashList[index];
|
||||
pEntry->num--;
|
||||
|
||||
pHashObj->size--;
|
||||
|
||||
pNode->next = NULL;
|
||||
|
@ -325,8 +319,7 @@ void taosHashCleanup(SHashObj *pHashObj) {
|
|||
|
||||
if (pHashObj->hashList) {
|
||||
for (int32_t i = 0; i < pHashObj->capacity; ++i) {
|
||||
SHashEntry *pEntry = pHashObj->hashList[i];
|
||||
pNode = pEntry->next;
|
||||
pNode = pHashObj->hashList[i];
|
||||
|
||||
while (pNode) {
|
||||
pNext = pNode->next;
|
||||
|
@ -337,8 +330,6 @@ void taosHashCleanup(SHashObj *pHashObj) {
|
|||
free(pNode);
|
||||
pNode = pNext;
|
||||
}
|
||||
|
||||
tfree(pEntry);
|
||||
}
|
||||
|
||||
free(pHashObj->hashList);
|
||||
|
@ -385,13 +376,13 @@ bool taosHashIterNext(SHashMutableIterator *pIter) {
|
|||
assert(pIter->pCur == NULL && pIter->pNext == NULL);
|
||||
|
||||
while (1) {
|
||||
SHashEntry *pEntry = pIter->pHashObj->hashList[pIter->entryIndex];
|
||||
if (pEntry->next == NULL) {
|
||||
SHashNode *pEntry = pIter->pHashObj->hashList[pIter->entryIndex];
|
||||
if (pEntry == NULL) {
|
||||
pIter->entryIndex++;
|
||||
continue;
|
||||
}
|
||||
|
||||
pIter->pCur = pEntry->next;
|
||||
pIter->pCur = pEntry;
|
||||
|
||||
if (pIter->pCur->next) {
|
||||
pIter->pNext = pIter->pCur->next;
|
||||
|
@ -444,25 +435,25 @@ int32_t taosHashGetMaxOverflowLinkLength(const SHashObj *pHashObj) {
|
|||
int32_t num = 0;
|
||||
|
||||
for (int32_t i = 0; i < pHashObj->size; ++i) {
|
||||
SHashEntry *pEntry = pHashObj->hashList[i];
|
||||
if (num < pEntry->num) {
|
||||
num = pEntry->num;
|
||||
SHashNode *pEntry = pHashObj->hashList[i];
|
||||
if (pEntry == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
int32_t j = 0;
|
||||
while(pEntry != NULL) {
|
||||
pEntry = pEntry->next;
|
||||
j++;
|
||||
}
|
||||
|
||||
if (num < j) {
|
||||
num = j;
|
||||
}
|
||||
}
|
||||
|
||||
return num;
|
||||
}
|
||||
|
||||
void doUpdateHashTable(SHashObj *pHashObj, SHashNode *pNode) {
|
||||
if (pNode->prev1) {
|
||||
pNode->prev1->next = pNode;
|
||||
}
|
||||
|
||||
if (pNode->next) {
|
||||
(pNode->next)->prev = pNode;
|
||||
}
|
||||
}
|
||||
|
||||
void taosHashTableResize(SHashObj *pHashObj) {
|
||||
if (pHashObj->size < pHashObj->capacity * HASH_DEFAULT_LOAD_FACTOR) {
|
||||
return;
|
||||
|
@ -479,69 +470,53 @@ void taosHashTableResize(SHashObj *pHashObj) {
|
|||
return;
|
||||
}
|
||||
|
||||
// int64_t st = taosGetTimestampUs();
|
||||
|
||||
SHashEntry **pNewEntry = realloc(pHashObj->hashList, sizeof(SHashEntry *) * newSize);
|
||||
if (pNewEntry == NULL) {
|
||||
void *pNewEntry = realloc(pHashObj->hashList, POINTER_BYTES * newSize);
|
||||
if (pNewEntry == NULL) {// todo handle error
|
||||
// uTrace("cache resize failed due to out of memory, capacity remain:%d", pHashObj->capacity);
|
||||
return;
|
||||
}
|
||||
|
||||
pHashObj->hashList = pNewEntry;
|
||||
for (int32_t i = pHashObj->capacity; i < newSize; ++i) {
|
||||
pHashObj->hashList[i] = calloc(1, sizeof(SHashEntry));
|
||||
}
|
||||
memset(&pHashObj->hashList[pHashObj->capacity], 0, POINTER_BYTES * (newSize - pHashObj->capacity));
|
||||
|
||||
pHashObj->capacity = newSize;
|
||||
|
||||
for (int32_t i = 0; i < pHashObj->capacity; ++i) {
|
||||
SHashEntry *pEntry = pHashObj->hashList[i];
|
||||
|
||||
pNode = pEntry->next;
|
||||
pNode = pHashObj->hashList[i];
|
||||
if (pNode != NULL) {
|
||||
assert(pNode->prev1 == pEntry && pEntry->num > 0);
|
||||
assert(pNode->prev == NULL);
|
||||
}
|
||||
|
||||
while (pNode) {
|
||||
int32_t j = HASH_INDEX(pNode->hashVal, pHashObj->capacity);
|
||||
if (j == i) { // this key resides in the same slot, no need to relocate it
|
||||
if (j == i) { // this key locates in the same slot, no need to relocate it
|
||||
pNode = pNode->next;
|
||||
} else {
|
||||
pNext = pNode->next;
|
||||
|
||||
// remove from current slot
|
||||
assert(pNode->prev1 != NULL);
|
||||
|
||||
if (pNode->prev1 == pEntry) { // first node of the overflow linked list
|
||||
pEntry->next = pNode->next;
|
||||
if (pNode->prev == NULL) { // first node of the overflow linked list
|
||||
pHashObj->hashList[i] = pNext;
|
||||
} else {
|
||||
pNode->prev->next = pNode->next;
|
||||
pNode->prev->next = pNext;
|
||||
}
|
||||
|
||||
pEntry->num--;
|
||||
assert(pEntry->num >= 0);
|
||||
|
||||
if (pNode->next != NULL) {
|
||||
(pNode->next)->prev = pNode->prev;
|
||||
if (pNext != NULL) {
|
||||
pNext->prev = pNode->prev;
|
||||
}
|
||||
|
||||
// clear pointer
|
||||
pNode->next = NULL;
|
||||
pNode->prev = NULL;
|
||||
|
||||
// added into new slot
|
||||
pNode->next = NULL;
|
||||
pNode->prev1 = NULL;
|
||||
|
||||
SHashEntry *pNewIndexEntry = pHashObj->hashList[j];
|
||||
|
||||
if (pNewIndexEntry->next != NULL) {
|
||||
assert(pNewIndexEntry->next->prev1 == pNewIndexEntry);
|
||||
|
||||
pNewIndexEntry->next->prev = pNode;
|
||||
SHashNode *pNew = pHashObj->hashList[j];
|
||||
if (pNew != NULL) {
|
||||
assert(pNew->prev == NULL);
|
||||
pNew->prev = pNode;
|
||||
}
|
||||
|
||||
pNode->next = pNewIndexEntry->next;
|
||||
pNode->prev1 = pNewIndexEntry;
|
||||
|
||||
pNewIndexEntry->next = pNode;
|
||||
pNewIndexEntry->num++;
|
||||
pNode->next = pNew;
|
||||
pHashObj->hashList[j] = pNode;
|
||||
|
||||
// continue
|
||||
pNode = pNext;
|
||||
|
@ -549,7 +524,6 @@ void taosHashTableResize(SHashObj *pHashObj) {
|
|||
}
|
||||
}
|
||||
|
||||
// int64_t et = taosGetTimestampUs();
|
||||
// uTrace("hash table resize completed, new capacity:%d, load factor:%f, elapsed time:%fms", pHashObj->capacity,
|
||||
// ((double)pHashObj->size) / pHashObj->capacity, (et - st) / 1000.0);
|
||||
}
|
||||
|
@ -595,19 +569,17 @@ SHashNode *doUpdateHashNode(SHashNode *pNode, const void *key, size_t keyLen, co
|
|||
void doAddToHashTable(SHashObj *pHashObj, SHashNode *pNode) {
|
||||
assert(pNode != NULL);
|
||||
|
||||
int32_t index = HASH_INDEX(pNode->hashVal, pHashObj->capacity);
|
||||
SHashEntry *pEntry = pHashObj->hashList[index];
|
||||
int32_t index = HASH_INDEX(pNode->hashVal, pHashObj->capacity);
|
||||
|
||||
pNode->next = pEntry->next;
|
||||
|
||||
if (pEntry->next) {
|
||||
pEntry->next->prev = pNode;
|
||||
SHashNode* pEntry = pHashObj->hashList[index];
|
||||
if (pEntry != NULL) {
|
||||
pEntry->prev = pNode;
|
||||
|
||||
pNode->next = pEntry;
|
||||
pNode->prev = NULL;
|
||||
}
|
||||
|
||||
pEntry->next = pNode;
|
||||
pNode->prev1 = pEntry;
|
||||
|
||||
pEntry->num++;
|
||||
pHashObj->hashList[index] = pNode;
|
||||
pHashObj->size++;
|
||||
}
|
||||
|
||||
|
@ -616,13 +588,13 @@ SHashNode *getNextHashNode(SHashMutableIterator *pIter) {
|
|||
|
||||
pIter->entryIndex++;
|
||||
while (pIter->entryIndex < pIter->pHashObj->capacity) {
|
||||
SHashEntry *pEntry = pIter->pHashObj->hashList[pIter->entryIndex];
|
||||
if (pEntry->next == NULL) {
|
||||
SHashNode *pNode = pIter->pHashObj->hashList[pIter->entryIndex];
|
||||
if (pNode == NULL) {
|
||||
pIter->entryIndex++;
|
||||
continue;
|
||||
}
|
||||
|
||||
return pEntry->next;
|
||||
return pNode;
|
||||
}
|
||||
|
||||
return NULL;
|
||||
|
|
|
@ -92,7 +92,7 @@ int32_t compareLenPrefixedWStr(const void *pLeft, const void *pRight) {
|
|||
if (len1 != len2) {
|
||||
return len1 > len2? 1:-1;
|
||||
} else {
|
||||
int32_t ret = wcsncmp(varDataVal(pLeft), varDataVal(pRight), len1);
|
||||
int32_t ret = wcsncmp(varDataVal(pLeft), varDataVal(pRight), len1/TSDB_NCHAR_SIZE);
|
||||
if (ret == 0) {
|
||||
return 0;
|
||||
} else {
|
||||
|
|
|
@ -149,8 +149,8 @@ int main(int argc, char** argv) {
|
|||
}
|
||||
|
||||
TEST(testCase, hashTest) {
|
||||
// simpleTest();
|
||||
// stringKeyTest();
|
||||
// noLockPerformanceTest();
|
||||
// multithreadsTest();
|
||||
simpleTest();
|
||||
stringKeyTest();
|
||||
noLockPerformanceTest();
|
||||
multithreadsTest();
|
||||
}
|
|
@ -140,12 +140,10 @@ static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, SRspRe
|
|||
|
||||
char *pTagData = pTable->data + totalCols * sizeof(SSchema);
|
||||
int accumBytes = 0;
|
||||
//dataRow = tdNewDataRowFromSchema(pDestTagSchema);
|
||||
dataRow = tdNewTagRowFromSchema(pDestTagSchema, numOfTags);
|
||||
|
||||
for (int i = 0; i < numOfTags; i++) {
|
||||
STColumn *pTCol = schemaColAt(pDestTagSchema, i);
|
||||
// tdAppendColVal(dataRow, pTagData + accumBytes, pTCol->type, pTCol->bytes, pTCol->offset);
|
||||
tdAppendTagColVal(dataRow, pTagData + accumBytes, pTCol->type, pTCol->bytes, pTCol->colId);
|
||||
accumBytes += htons(pSchema[i + numOfColumns].bytes);
|
||||
}
|
||||
|
|
|
@ -108,10 +108,10 @@ $cache = 16 # 16MB
|
|||
$ablocks = 100
|
||||
$tblocks = 32 # max=512, automatically trimmed when exceeding
|
||||
$ctime = 36000 # 10 hours
|
||||
$wal = 0 # valid value is 0, 1, 2
|
||||
$wal = 1 # valid value is 1, 2
|
||||
$comp = 1 # max=32, automatically trimmed when exceeding
|
||||
|
||||
sql create database $db replica $replica days $days keep $keep maxrows $rows_db cache $cache ctime $ctime wal $wal comp $comp
|
||||
sql create database $db replica $replica days $days keep $keep maxrows $rows_db cache $cache blocks 4 ctime $ctime wal $wal comp $comp
|
||||
sql show databases
|
||||
if $rows != 1 then
|
||||
return -1
|
||||
|
@ -129,18 +129,15 @@ if $data06 != 365,365,365 then
|
|||
return -1
|
||||
endi
|
||||
print data08 = $data08
|
||||
if $data08 != $rows_db then
|
||||
if $data08 != $cache then
|
||||
print expect $cache, actual:$data08
|
||||
return -1
|
||||
endi
|
||||
if $data09 != $cache then
|
||||
if $data09 != 4 then
|
||||
return -1
|
||||
endi
|
||||
sql drop database $db
|
||||
|
||||
# ablocks_smaller_than_tblocks
|
||||
#$ablocks = 50
|
||||
#$tblocks = 100
|
||||
#sql_error create database $db ablocks $ablocks tblocks $tblocks
|
||||
sql drop database $db
|
||||
|
||||
## param range tests
|
||||
# replica [1,3]
|
||||
|
@ -160,14 +157,11 @@ sql_error create database $db maxrows 199
|
|||
#sql_error create database $db maxrows 10001
|
||||
|
||||
# cache [100, 10485760]
|
||||
sql_error create database $db cache 99
|
||||
sql_error create database $db cache 0
|
||||
#sql_error create database $db cache 10485761
|
||||
|
||||
# ablocks [overwriten by 4*maxtablesPerVnode, 409600]
|
||||
sql_error create database $db ablocks -1
|
||||
#sql_error create database $db ablocks 409601
|
||||
|
||||
# tblocks [32, 4096 overwriten by 4096 if exceeds, Note added:2018-10-24]
|
||||
# blocks [32, 4096 overwriten by 4096 if exceeds, Note added:2018-10-24]
|
||||
#sql_error create database $db tblocks 31
|
||||
#sql_error create database $db tblocks 4097
|
||||
|
||||
|
@ -175,9 +169,10 @@ sql_error create database $db ablocks -1
|
|||
sql_error create database $db ctime 29
|
||||
sql_error create database $db ctime 40961
|
||||
|
||||
# wal {0, 1}
|
||||
# wal {1, 2}
|
||||
sql_error create database $db wal 0
|
||||
sql_error create database $db wal -1
|
||||
#sql_error create database $db wal 2
|
||||
sql_error create database $db wal 3
|
||||
|
||||
# comp {0, 1, 2}
|
||||
sql_error create database $db comp -1
|
||||
|
|
|
@ -400,6 +400,7 @@ endi
|
|||
$limit = $totalNum / 2
|
||||
sql select max(c1), min(c2), avg(c3), count(c4), sum(c5), spread(c6), first(c7), last(c8), first(c9) from $stb where ts >= $ts0 and ts <= $tsu and t1 > 1 and t1 < 8 group by t1 order by t1 asc limit $limit offset 0
|
||||
if $rows != 6 then
|
||||
print expect 6, actual:$rows
|
||||
return -1
|
||||
endi
|
||||
if $data00 != 9 then
|
||||
|
|
Loading…
Reference in New Issue