Merge branch '3.0' into 3.0test/jcy

This commit is contained in:
jiacy-jcy 2022-07-05 13:35:06 +08:00
commit 0a4fce3594
31 changed files with 757 additions and 271 deletions

View File

@ -174,6 +174,8 @@ cmake .. -G "NMake Makefiles"
nmake nmake
``` ```
如果你使用的是 Visual Studio 2022 版本, 脚本 `vcvarsall.bat` 的默认安装路径是 `C:\Program Files\Microsoft Visual Studio\2022\Community\VC\Auxiliary\Build\vcvarsall.bat`
### Mac OS X 系统 ### Mac OS X 系统
安装 Xcode 命令行工具和 cmake. 在 Catalina 和 Big Sur 操作系统上,需要安装 XCode 11.4+ 版本。 安装 Xcode 命令行工具和 cmake. 在 Catalina 和 Big Sur 操作系统上,需要安装 XCode 11.4+ 版本。

View File

@ -136,7 +136,7 @@ cmake .. -DCPUTYPE=mips64 && cmake --build .
### On Windows platform ### On Windows platform
If you use the Visual Studio 2013, please open a command window by executing "cmd.exe". If you use Visual Studio 2013, please open a command window by executing "cmd.exe".
Please specify "amd64" for 64 bits Windows or specify "x86" is for 32 bits Windows when you execute vcvarsall.bat. Please specify "amd64" for 64 bits Windows or specify "x86" is for 32 bits Windows when you execute vcvarsall.bat.
```cmd ```cmd
mkdir debug && cd debug mkdir debug && cd debug
@ -145,7 +145,7 @@ cmake .. -G "NMake Makefiles"
nmake nmake
``` ```
If you use the Visual Studio 2019 or 2017: If you use Visual Studio 2019 or 2017:
please open a command window by executing "cmd.exe". please open a command window by executing "cmd.exe".
Please specify "x64" for 64 bits Windows or specify "x86" is for 32 bits Windows when you execute vcvarsall.bat. Please specify "x64" for 64 bits Windows or specify "x86" is for 32 bits Windows when you execute vcvarsall.bat.
@ -164,6 +164,8 @@ cmake .. -G "NMake Makefiles"
nmake nmake
``` ```
If you use Visual Studio 2022, the only change is the default path of `vcvarsall.bat`, which is `C:\Program Files\Microsoft Visual Studio\2022\Community\VC\Auxiliary\Build\vcvarsall.bat`.
### On Mac OS X platform ### On Mac OS X platform
Please install XCode command line tools and cmake. Verified with XCode 11.4+ on Catalina and Big Sur. Please install XCode command line tools and cmake. Verified with XCode 11.4+ on Catalina and Big Sur.

View File

@ -2687,10 +2687,12 @@ int32_t tDeserializeSDbCfgRsp(void *buf, int32_t bufLen, SDbCfgRsp *pRsp) {
if (tDecodeI8(&decoder, &pRsp->strict) < 0) return -1; if (tDecodeI8(&decoder, &pRsp->strict) < 0) return -1;
if (tDecodeI8(&decoder, &pRsp->cacheLastRow) < 0) return -1; if (tDecodeI8(&decoder, &pRsp->cacheLastRow) < 0) return -1;
if (tDecodeI32(&decoder, &pRsp->numOfRetensions) < 0) return -1; if (tDecodeI32(&decoder, &pRsp->numOfRetensions) < 0) return -1;
pRsp->pRetensions = taosArrayInit(pRsp->numOfRetensions, sizeof(SRetention)); if (pRsp->numOfRetensions > 0) {
if (pRsp->pRetensions == NULL) { pRsp->pRetensions = taosArrayInit(pRsp->numOfRetensions, sizeof(SRetention));
terrno = TSDB_CODE_OUT_OF_MEMORY; if (pRsp->pRetensions == NULL) {
return -1; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
} }
for (int32_t i = 0; i < pRsp->numOfRetensions; ++i) { for (int32_t i = 0; i < pRsp->numOfRetensions; ++i) {

View File

@ -83,8 +83,8 @@ int32_t smaBegin(SSma *pSma) {
/** /**
* @brief pre-commit for rollup sma. * @brief pre-commit for rollup sma.
* 1) set trigger stat of rsma timer TASK_TRIGGER_STAT_PAUSED. * 1) set trigger stat of rsma timer TASK_TRIGGER_STAT_PAUSED.
* 2) perform persist task for qTaskInfo * 2) wait all triggered fetch tasks finished
* 3) wait all triggered fetch tasks finished * 3) perform persist task for qTaskInfo
* *
* @param pSma * @param pSma
* @return int32_t * @return int32_t
@ -102,10 +102,7 @@ static int32_t tdProcessRSmaPreCommitImpl(SSma *pSma) {
// step 1: set persistence task paused // step 1: set persistence task paused
atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_PAUSED); atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_PAUSED);
// step 2: perform persist task for qTaskInfo // step 2: wait all triggered fetch tasks finished
tdRSmaPersistExecImpl(pRSmaStat);
// step 3: wait all triggered fetch tasks finished
int32_t nLoops = 0; int32_t nLoops = 0;
while (1) { while (1) {
if (T_REF_VAL_GET(pStat) == 0) { if (T_REF_VAL_GET(pStat) == 0) {
@ -121,6 +118,9 @@ static int32_t tdProcessRSmaPreCommitImpl(SSma *pSma) {
} }
} }
// step 3: perform persist task for qTaskInfo
tdRSmaPersistExecImpl(pRSmaStat);
smaDebug("vgId:%d, rsma pre commit succeess", SMA_VID(pSma)); smaDebug("vgId:%d, rsma pre commit succeess", SMA_VID(pSma));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;

View File

@ -719,8 +719,10 @@ int32_t avgFunction(SqlFunctionCtx* pCtx) {
ASSERT(numOfElem >= 0); ASSERT(numOfElem >= 0);
pAvgRes->count += numOfElem; pAvgRes->count += numOfElem;
if (IS_INTEGER_TYPE(type)) { if (IS_SIGNED_NUMERIC_TYPE(type)) {
pAvgRes->sum.isum += pAgg->sum; pAvgRes->sum.isum += pAgg->sum;
} else if (IS_UNSIGNED_NUMERIC_TYPE(type)) {
pAvgRes->sum.usum += pAgg->sum;
} else if (IS_FLOAT_TYPE(type)) { } else if (IS_FLOAT_TYPE(type)) {
pAvgRes->sum.dsum += GET_DOUBLE_VAL((const char*)&(pAgg->sum)); pAvgRes->sum.dsum += GET_DOUBLE_VAL((const char*)&(pAgg->sum));
} }
@ -784,6 +786,64 @@ int32_t avgFunction(SqlFunctionCtx* pCtx) {
break; break;
} }
case TSDB_DATA_TYPE_UTINYINT: {
uint8_t* plist = (uint8_t*)pCol->pData;
for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
continue;
}
numOfElem += 1;
pAvgRes->count += 1;
pAvgRes->sum.usum += plist[i];
}
break;
}
case TSDB_DATA_TYPE_USMALLINT: {
uint16_t* plist = (uint16_t*)pCol->pData;
for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
continue;
}
numOfElem += 1;
pAvgRes->count += 1;
pAvgRes->sum.usum += plist[i];
}
break;
}
case TSDB_DATA_TYPE_UINT: {
uint32_t* plist = (uint32_t*)pCol->pData;
for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
continue;
}
numOfElem += 1;
pAvgRes->count += 1;
pAvgRes->sum.usum += plist[i];
}
break;
}
case TSDB_DATA_TYPE_UBIGINT: {
uint64_t* plist = (uint64_t*)pCol->pData;
for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
continue;
}
numOfElem += 1;
pAvgRes->count += 1;
pAvgRes->sum.usum += plist[i];
}
break;
}
case TSDB_DATA_TYPE_FLOAT: { case TSDB_DATA_TYPE_FLOAT: {
float* plist = (float*)pCol->pData; float* plist = (float*)pCol->pData;
for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) { for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
@ -825,8 +885,10 @@ _avg_over:
static void avgTransferInfo(SAvgRes* pInput, SAvgRes* pOutput) { static void avgTransferInfo(SAvgRes* pInput, SAvgRes* pOutput) {
pOutput->type = pInput->type; pOutput->type = pInput->type;
if (IS_INTEGER_TYPE(pOutput->type)) { if (IS_SIGNED_NUMERIC_TYPE(pOutput->type)) {
pOutput->sum.isum += pInput->sum.isum; pOutput->sum.isum += pInput->sum.isum;
} else if (IS_UNSIGNED_NUMERIC_TYPE(pOutput->type)) {
pOutput->sum.usum += pInput->sum.usum;
} else { } else {
pOutput->sum.dsum += pInput->sum.dsum; pOutput->sum.dsum += pInput->sum.dsum;
} }
@ -900,6 +962,22 @@ int32_t avgInvertFunction(SqlFunctionCtx* pCtx) {
LIST_AVG_N(pAvgRes->sum.isum, int64_t); LIST_AVG_N(pAvgRes->sum.isum, int64_t);
break; break;
} }
case TSDB_DATA_TYPE_UTINYINT: {
LIST_AVG_N(pAvgRes->sum.usum, uint8_t);
break;
}
case TSDB_DATA_TYPE_USMALLINT: {
LIST_AVG_N(pAvgRes->sum.usum, uint16_t);
break;
}
case TSDB_DATA_TYPE_UINT: {
LIST_AVG_N(pAvgRes->sum.usum, uint32_t);
break;
}
case TSDB_DATA_TYPE_UBIGINT: {
LIST_AVG_N(pAvgRes->sum.usum, uint64_t);
break;
}
case TSDB_DATA_TYPE_FLOAT: { case TSDB_DATA_TYPE_FLOAT: {
LIST_AVG_N(pAvgRes->sum.dsum, float); LIST_AVG_N(pAvgRes->sum.dsum, float);
break; break;
@ -925,8 +1003,10 @@ int32_t avgCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
SResultRowEntryInfo* pSResInfo = GET_RES_INFO(pSourceCtx); SResultRowEntryInfo* pSResInfo = GET_RES_INFO(pSourceCtx);
SAvgRes* pSBuf = GET_ROWCELL_INTERBUF(pSResInfo); SAvgRes* pSBuf = GET_ROWCELL_INTERBUF(pSResInfo);
if (IS_INTEGER_TYPE(type)) { if (IS_SIGNED_NUMERIC_TYPE(type)) {
pDBuf->sum.isum += pSBuf->sum.isum; pDBuf->sum.isum += pSBuf->sum.isum;
} else if (IS_UNSIGNED_NUMERIC_TYPE(type)) {
pDBuf->sum.usum += pSBuf->sum.usum;
} else { } else {
pDBuf->sum.dsum += pSBuf->sum.dsum; pDBuf->sum.dsum += pSBuf->sum.dsum;
} }
@ -941,8 +1021,10 @@ int32_t avgFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
SAvgRes* pAvgRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); SAvgRes* pAvgRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
int32_t type = pAvgRes->type; int32_t type = pAvgRes->type;
if (IS_INTEGER_TYPE(type)) { if (IS_SIGNED_NUMERIC_TYPE(type)) {
pAvgRes->result = pAvgRes->sum.isum / ((double)pAvgRes->count); pAvgRes->result = pAvgRes->sum.isum / ((double)pAvgRes->count);
} else if (IS_UNSIGNED_NUMERIC_TYPE(type)) {
pAvgRes->result = pAvgRes->sum.usum / ((double)pAvgRes->count);
} else { } else {
pAvgRes->result = pAvgRes->sum.dsum / ((double)pAvgRes->count); pAvgRes->result = pAvgRes->sum.dsum / ((double)pAvgRes->count);
} }

View File

@ -784,12 +784,144 @@ static int32_t parseBoolFromValueNode(STranslateContext* pCxt, SValueNode* pVal)
} }
} }
static EDealRes translateValueImpl(STranslateContext* pCxt, SValueNode* pVal, SDataType targetDt) { static EDealRes translateDurationValue(STranslateContext* pCxt, SValueNode* pVal) {
uint8_t precision = getPrecisionFromCurrStmt(pCxt->pCurrStmt, targetDt.precision); if (parseNatualDuration(pVal->literal, strlen(pVal->literal), &pVal->datum.i, &pVal->unit,
pVal->node.resType.precision = precision; pVal->node.resType.precision) != TSDB_CODE_SUCCESS) {
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_WRONG_VALUE_TYPE, pVal->literal);
}
*(int64_t*)&pVal->typeData = pVal->datum.i;
return DEAL_RES_CONTINUE;
}
static EDealRes translateNormalValue(STranslateContext* pCxt, SValueNode* pVal, SDataType targetDt, bool strict) {
int32_t code = TSDB_CODE_SUCCESS;
switch (targetDt.type) {
case TSDB_DATA_TYPE_BOOL:
if (TSDB_CODE_SUCCESS != parseBoolFromValueNode(pCxt, pVal)) {
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_WRONG_VALUE_TYPE, pVal->literal);
}
*(bool*)&pVal->typeData = pVal->datum.b;
break;
case TSDB_DATA_TYPE_TINYINT: {
code = toInteger(pVal->literal, strlen(pVal->literal), 10, &pVal->datum.i);
if (strict && (TSDB_CODE_SUCCESS != code || !IS_VALID_TINYINT(pVal->datum.i))) {
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_WRONG_VALUE_TYPE, pVal->literal);
}
*(int8_t*)&pVal->typeData = pVal->datum.i;
break;
}
case TSDB_DATA_TYPE_SMALLINT: {
code = toInteger(pVal->literal, strlen(pVal->literal), 10, &pVal->datum.i);
if (strict && (TSDB_CODE_SUCCESS != code || !IS_VALID_SMALLINT(pVal->datum.i))) {
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_WRONG_VALUE_TYPE, pVal->literal);
}
*(int16_t*)&pVal->typeData = pVal->datum.i;
break;
}
case TSDB_DATA_TYPE_INT: {
code = toInteger(pVal->literal, strlen(pVal->literal), 10, &pVal->datum.i);
if (strict && (TSDB_CODE_SUCCESS != code || !IS_VALID_INT(pVal->datum.i))) {
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_WRONG_VALUE_TYPE, pVal->literal);
}
*(int32_t*)&pVal->typeData = pVal->datum.i;
break;
}
case TSDB_DATA_TYPE_BIGINT: {
code = toInteger(pVal->literal, strlen(pVal->literal), 10, &pVal->datum.i);
if (strict && (TSDB_CODE_SUCCESS != code || !IS_VALID_BIGINT(pVal->datum.i))) {
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_WRONG_VALUE_TYPE, pVal->literal);
}
*(int64_t*)&pVal->typeData = pVal->datum.i;
break;
}
case TSDB_DATA_TYPE_UTINYINT: {
code = toUInteger(pVal->literal, strlen(pVal->literal), 10, &pVal->datum.u);
if (strict && (TSDB_CODE_SUCCESS != code || !IS_VALID_UTINYINT(pVal->datum.i))) {
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_WRONG_VALUE_TYPE, pVal->literal);
}
*(uint8_t*)&pVal->typeData = pVal->datum.u;
break;
}
case TSDB_DATA_TYPE_USMALLINT: {
code = toUInteger(pVal->literal, strlen(pVal->literal), 10, &pVal->datum.u);
if (strict && (TSDB_CODE_SUCCESS != code || !IS_VALID_USMALLINT(pVal->datum.i))) {
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_WRONG_VALUE_TYPE, pVal->literal);
}
*(uint16_t*)&pVal->typeData = pVal->datum.u;
break;
}
case TSDB_DATA_TYPE_UINT: {
code = toUInteger(pVal->literal, strlen(pVal->literal), 10, &pVal->datum.u);
if (strict && (TSDB_CODE_SUCCESS != code || !IS_VALID_UINT(pVal->datum.i))) {
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_WRONG_VALUE_TYPE, pVal->literal);
}
*(uint32_t*)&pVal->typeData = pVal->datum.u;
break;
}
case TSDB_DATA_TYPE_UBIGINT: {
code = toUInteger(pVal->literal, strlen(pVal->literal), 10, &pVal->datum.u);
if (strict && (TSDB_CODE_SUCCESS != code || !IS_VALID_UBIGINT(pVal->datum.i))) {
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_WRONG_VALUE_TYPE, pVal->literal);
}
*(uint64_t*)&pVal->typeData = pVal->datum.u;
break;
}
case TSDB_DATA_TYPE_FLOAT: {
pVal->datum.d = taosStr2Double(pVal->literal, NULL);
*(float*)&pVal->typeData = pVal->datum.d;
break;
}
case TSDB_DATA_TYPE_DOUBLE: {
pVal->datum.d = taosStr2Double(pVal->literal, NULL);
*(double*)&pVal->typeData = pVal->datum.d;
break;
}
case TSDB_DATA_TYPE_VARCHAR:
case TSDB_DATA_TYPE_VARBINARY: {
pVal->datum.p = taosMemoryCalloc(1, targetDt.bytes + 1);
if (NULL == pVal->datum.p) {
return generateDealNodeErrMsg(pCxt, TSDB_CODE_OUT_OF_MEMORY);
}
int32_t len = TMIN(targetDt.bytes - VARSTR_HEADER_SIZE, pVal->node.resType.bytes);
varDataSetLen(pVal->datum.p, len);
strncpy(varDataVal(pVal->datum.p), pVal->literal, len);
break;
}
case TSDB_DATA_TYPE_TIMESTAMP: {
if (TSDB_CODE_SUCCESS != parseTimeFromValueNode(pCxt, pVal)) {
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_WRONG_VALUE_TYPE, pVal->literal);
}
*(int64_t*)&pVal->typeData = pVal->datum.i;
break;
}
case TSDB_DATA_TYPE_NCHAR: {
pVal->datum.p = taosMemoryCalloc(1, targetDt.bytes + 1);
if (NULL == pVal->datum.p) {
return generateDealNodeErrMsg(pCxt, TSDB_CODE_OUT_OF_MEMORY);
}
int32_t len = 0;
if (!taosMbsToUcs4(pVal->literal, strlen(pVal->literal), (TdUcs4*)varDataVal(pVal->datum.p),
targetDt.bytes - VARSTR_HEADER_SIZE, &len)) {
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_WRONG_VALUE_TYPE, pVal->literal);
}
varDataSetLen(pVal->datum.p, len);
break;
}
case TSDB_DATA_TYPE_DECIMAL:
case TSDB_DATA_TYPE_BLOB:
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_WRONG_VALUE_TYPE, pVal->literal);
default:
break;
}
return DEAL_RES_CONTINUE;
}
static EDealRes translateValueImpl(STranslateContext* pCxt, SValueNode* pVal, SDataType targetDt, bool strict) {
if (pVal->placeholderNo > 0 || pVal->isNull) { if (pVal->placeholderNo > 0 || pVal->isNull) {
return DEAL_RES_CONTINUE; return DEAL_RES_CONTINUE;
} }
if (TSDB_DATA_TYPE_NULL == pVal->node.resType.type) { if (TSDB_DATA_TYPE_NULL == pVal->node.resType.type) {
// TODO // TODO
// pVal->node.resType = targetDt; // pVal->node.resType = targetDt;
@ -797,114 +929,18 @@ static EDealRes translateValueImpl(STranslateContext* pCxt, SValueNode* pVal, SD
pVal->isNull = true; pVal->isNull = true;
return DEAL_RES_CONTINUE; return DEAL_RES_CONTINUE;
} }
if (pVal->isDuration) {
if (parseNatualDuration(pVal->literal, strlen(pVal->literal), &pVal->datum.i, &pVal->unit, precision) !=
TSDB_CODE_SUCCESS) {
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_WRONG_VALUE_TYPE, pVal->literal);
}
*(int64_t*)&pVal->typeData = pVal->datum.i;
} else {
switch (targetDt.type) {
case TSDB_DATA_TYPE_NULL:
break;
case TSDB_DATA_TYPE_BOOL:
if (TSDB_CODE_SUCCESS != parseBoolFromValueNode(pCxt, pVal)) {
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_WRONG_VALUE_TYPE, pVal->literal);
}
*(bool*)&pVal->typeData = pVal->datum.b;
break;
case TSDB_DATA_TYPE_TINYINT: {
pVal->datum.i = taosStr2Int64(pVal->literal, NULL, 10);
*(int8_t*)&pVal->typeData = pVal->datum.i;
break;
}
case TSDB_DATA_TYPE_SMALLINT: {
pVal->datum.i = taosStr2Int64(pVal->literal, NULL, 10);
*(int16_t*)&pVal->typeData = pVal->datum.i;
break;
}
case TSDB_DATA_TYPE_INT: {
pVal->datum.i = taosStr2Int64(pVal->literal, NULL, 10);
*(int32_t*)&pVal->typeData = pVal->datum.i;
break;
}
case TSDB_DATA_TYPE_BIGINT: {
pVal->datum.i = taosStr2Int64(pVal->literal, NULL, 10);
*(int64_t*)&pVal->typeData = pVal->datum.i;
break;
}
case TSDB_DATA_TYPE_UTINYINT: {
pVal->datum.u = taosStr2UInt64(pVal->literal, NULL, 10);
*(uint8_t*)&pVal->typeData = pVal->datum.u;
break;
}
case TSDB_DATA_TYPE_USMALLINT: {
pVal->datum.u = taosStr2UInt64(pVal->literal, NULL, 10);
*(uint16_t*)&pVal->typeData = pVal->datum.u;
break;
}
case TSDB_DATA_TYPE_UINT: {
pVal->datum.u = taosStr2UInt64(pVal->literal, NULL, 10);
*(uint32_t*)&pVal->typeData = pVal->datum.u;
break;
}
case TSDB_DATA_TYPE_UBIGINT: {
pVal->datum.u = taosStr2UInt64(pVal->literal, NULL, 10);
*(uint64_t*)&pVal->typeData = pVal->datum.u;
break;
}
case TSDB_DATA_TYPE_FLOAT: {
pVal->datum.d = taosStr2Double(pVal->literal, NULL);
*(float*)&pVal->typeData = pVal->datum.d;
break;
}
case TSDB_DATA_TYPE_DOUBLE: {
pVal->datum.d = taosStr2Double(pVal->literal, NULL);
*(double*)&pVal->typeData = pVal->datum.d;
break;
}
case TSDB_DATA_TYPE_VARCHAR:
case TSDB_DATA_TYPE_VARBINARY: {
pVal->datum.p = taosMemoryCalloc(1, targetDt.bytes + 1);
if (NULL == pVal->datum.p) {
return generateDealNodeErrMsg(pCxt, TSDB_CODE_OUT_OF_MEMORY);
}
int32_t len = TMIN(targetDt.bytes - VARSTR_HEADER_SIZE, pVal->node.resType.bytes);
varDataSetLen(pVal->datum.p, len);
strncpy(varDataVal(pVal->datum.p), pVal->literal, len);
break;
}
case TSDB_DATA_TYPE_TIMESTAMP: {
if (TSDB_CODE_SUCCESS != parseTimeFromValueNode(pCxt, pVal)) {
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_WRONG_VALUE_TYPE, pVal->literal);
}
*(int64_t*)&pVal->typeData = pVal->datum.i;
break;
}
case TSDB_DATA_TYPE_NCHAR: {
pVal->datum.p = taosMemoryCalloc(1, targetDt.bytes + 1);
if (NULL == pVal->datum.p) {
return generateDealNodeErrMsg(pCxt, TSDB_CODE_OUT_OF_MEMORY);
}
int32_t len = 0; pVal->node.resType.precision = getPrecisionFromCurrStmt(pCxt->pCurrStmt, targetDt.precision);
if (!taosMbsToUcs4(pVal->literal, strlen(pVal->literal), (TdUcs4*)varDataVal(pVal->datum.p),
targetDt.bytes - VARSTR_HEADER_SIZE, &len)) { EDealRes res = DEAL_RES_CONTINUE;
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_WRONG_VALUE_TYPE, pVal->literal); if (pVal->isDuration) {
} res = translateDurationValue(pCxt, pVal);
varDataSetLen(pVal->datum.p, len); } else {
break; res = translateNormalValue(pCxt, pVal, targetDt, strict);
}
case TSDB_DATA_TYPE_DECIMAL:
case TSDB_DATA_TYPE_BLOB:
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_WRONG_VALUE_TYPE, pVal->literal);
default:
break;
}
} }
pVal->node.resType = targetDt; pVal->node.resType = targetDt;
pVal->translate = true; pVal->translate = true;
return DEAL_RES_CONTINUE; return res;
} }
static int32_t calcTypeBytes(SDataType dt) { static int32_t calcTypeBytes(SDataType dt) {
@ -920,7 +956,7 @@ static int32_t calcTypeBytes(SDataType dt) {
static EDealRes translateValue(STranslateContext* pCxt, SValueNode* pVal) { static EDealRes translateValue(STranslateContext* pCxt, SValueNode* pVal) {
SDataType dt = pVal->node.resType; SDataType dt = pVal->node.resType;
dt.bytes = calcTypeBytes(dt); dt.bytes = calcTypeBytes(dt);
return translateValueImpl(pCxt, pVal, dt); return translateValueImpl(pCxt, pVal, dt, false);
} }
static bool isMultiResFunc(SNode* pNode) { static bool isMultiResFunc(SNode* pNode) {
@ -3216,13 +3252,30 @@ static bool validRollupFunc(const char* pFunc) {
return false; return false;
} }
static int32_t checkTableRollupOption(STranslateContext* pCxt, SNodeList* pFuncs) { static int32_t checkTableRollupOption(STranslateContext* pCxt, SNodeList* pFuncs, bool createStable,
SDbCfgInfo* pDbCfg) {
if (NULL == pFuncs) { if (NULL == pFuncs) {
if (NULL != pDbCfg->pRetensions) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_TABLE_OPTION,
"To create a super table in a database with the retensions parameter configured, "
"the 'ROLLUP' option must be present");
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
if (1 != LIST_LENGTH(pFuncs) || !validRollupFunc(((SFunctionNode*)nodesListGetNode(pFuncs, 0))->functionName)) { if (!createStable || NULL == pDbCfg->pRetensions) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_ROLLUP_OPTION); return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_TABLE_OPTION,
"Invalid option rollup: Only supported for create super table in databases "
"configured with the 'RETENTIONS' option");
}
if (1 != LIST_LENGTH(pFuncs)) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_ROLLUP_OPTION,
"Invalid option rollup: only one function is allowed");
}
const char* pFunc = ((SFunctionNode*)nodesListGetNode(pFuncs, 0))->functionName;
if (!validRollupFunc(pFunc)) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_ROLLUP_OPTION,
"Invalid option rollup: %s function is not supported", pFunc);
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -3248,8 +3301,8 @@ static int32_t checkTableTagsSchema(STranslateContext* pCxt, SHashObj* pHash, SN
code = generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_ONLY_ONE_JSON_TAG); code = generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_ONLY_ONE_JSON_TAG);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
if ((TSDB_DATA_TYPE_VARCHAR == pTag->dataType.type && pTag->dataType.bytes > TSDB_MAX_BINARY_LEN) || if ((TSDB_DATA_TYPE_VARCHAR == pTag->dataType.type && calcTypeBytes(pTag->dataType) > TSDB_MAX_BINARY_LEN) ||
(TSDB_DATA_TYPE_NCHAR == pTag->dataType.type && pTag->dataType.bytes > TSDB_MAX_NCHAR_LEN)) { (TSDB_DATA_TYPE_NCHAR == pTag->dataType.type && calcTypeBytes(pTag->dataType) > TSDB_MAX_NCHAR_LEN)) {
code = generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN); code = generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN);
} }
} }
@ -3270,11 +3323,11 @@ static int32_t checkTableTagsSchema(STranslateContext* pCxt, SHashObj* pHash, SN
return code; return code;
} }
static int32_t checkTableColsSchema(STranslateContext* pCxt, SHashObj* pHash, SNodeList* pCols) { static int32_t checkTableColsSchema(STranslateContext* pCxt, SHashObj* pHash, int32_t ntags, SNodeList* pCols) {
int32_t ncols = LIST_LENGTH(pCols); int32_t ncols = LIST_LENGTH(pCols);
if (ncols < TSDB_MIN_COLUMNS) { if (ncols < TSDB_MIN_COLUMNS) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_COLUMNS_NUM); return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_COLUMNS_NUM);
} else if (ncols > TSDB_MAX_COLUMNS) { } else if (ncols + ntags > TSDB_MAX_COLUMNS) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_TOO_MANY_COLUMNS); return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_TOO_MANY_COLUMNS);
} }
@ -3330,7 +3383,7 @@ static int32_t checkTableSchema(STranslateContext* pCxt, SCreateTableStmt* pStmt
int32_t code = checkTableTagsSchema(pCxt, pHash, pStmt->pTags); int32_t code = checkTableTagsSchema(pCxt, pHash, pStmt->pTags);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = checkTableColsSchema(pCxt, pHash, pStmt->pCols); code = checkTableColsSchema(pCxt, pHash, LIST_LENGTH(pStmt->pTags), pStmt->pCols);
} }
taosHashCleanup(pHash); taosHashCleanup(pHash);
@ -3359,11 +3412,18 @@ static int32_t getTableMaxDelayOption(STranslateContext* pCxt, SValueNode* pVal,
pMaxDelay); pMaxDelay);
} }
static int32_t checkTableMaxDelayOption(STranslateContext* pCxt, STableOptions* pOptions) { static int32_t checkTableMaxDelayOption(STranslateContext* pCxt, STableOptions* pOptions, bool createStable,
SDbCfgInfo* pDbCfg) {
if (NULL == pOptions->pMaxDelay) { if (NULL == pOptions->pMaxDelay) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
if (!createStable || NULL == pDbCfg->pRetensions) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_TABLE_OPTION,
"Invalid option maxdelay: Only supported for create super table in databases "
"configured with the 'RETENTIONS' option");
}
if (LIST_LENGTH(pOptions->pMaxDelay) > 2) { if (LIST_LENGTH(pOptions->pMaxDelay) > 2) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_TABLE_OPTION, "maxdelay"); return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_TABLE_OPTION, "maxdelay");
} }
@ -3382,11 +3442,18 @@ static int32_t getTableWatermarkOption(STranslateContext* pCxt, SValueNode* pVal
pMaxDelay); pMaxDelay);
} }
static int32_t checkTableWatermarkOption(STranslateContext* pCxt, STableOptions* pOptions) { static int32_t checkTableWatermarkOption(STranslateContext* pCxt, STableOptions* pOptions, bool createStable,
SDbCfgInfo* pDbCfg) {
if (NULL == pOptions->pWatermark) { if (NULL == pOptions->pWatermark) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
if (!createStable || NULL == pDbCfg->pRetensions) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_TABLE_OPTION,
"Invalid option watermark: Only supported for create super table in databases "
"configured with the 'RETENTIONS' option");
}
if (LIST_LENGTH(pOptions->pWatermark) > 2) { if (LIST_LENGTH(pOptions->pWatermark) > 2) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_TABLE_OPTION, "watermark"); return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_TABLE_OPTION, "watermark");
} }
@ -3400,13 +3467,20 @@ static int32_t checkTableWatermarkOption(STranslateContext* pCxt, STableOptions*
return code; return code;
} }
static int32_t checkCreateTable(STranslateContext* pCxt, SCreateTableStmt* pStmt) { static int32_t checkCreateTable(STranslateContext* pCxt, SCreateTableStmt* pStmt, bool createStable) {
int32_t code = checkTableMaxDelayOption(pCxt, pStmt->pOptions); int32_t code = TSDB_CODE_SUCCESS;
if (TSDB_CODE_SUCCESS == code) { SDbCfgInfo dbCfg = {0};
code = checkTableWatermarkOption(pCxt, pStmt->pOptions); if (createStable) {
code = getDBCfg(pCxt, pStmt->dbName, &dbCfg);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = checkTableRollupOption(pCxt, pStmt->pOptions->pRollupFuncs); code = checkTableMaxDelayOption(pCxt, pStmt->pOptions, createStable, &dbCfg);
}
if (TSDB_CODE_SUCCESS == code) {
code = checkTableWatermarkOption(pCxt, pStmt->pOptions, createStable, &dbCfg);
}
if (TSDB_CODE_SUCCESS == code) {
code = checkTableRollupOption(pCxt, pStmt->pOptions->pRollupFuncs, createStable, &dbCfg);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = checkTableSmaOption(pCxt, pStmt); code = checkTableSmaOption(pCxt, pStmt);
@ -3685,7 +3759,7 @@ static int32_t buildCreateStbReq(STranslateContext* pCxt, SCreateTableStmt* pStm
static int32_t translateCreateSuperTable(STranslateContext* pCxt, SCreateTableStmt* pStmt) { static int32_t translateCreateSuperTable(STranslateContext* pCxt, SCreateTableStmt* pStmt) {
SMCreateStbReq createReq = {0}; SMCreateStbReq createReq = {0};
int32_t code = checkCreateTable(pCxt, pStmt); int32_t code = checkCreateTable(pCxt, pStmt, true);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = buildCreateStbReq(pCxt, pStmt, &createReq); code = buildCreateStbReq(pCxt, pStmt, &createReq);
} }
@ -4265,6 +4339,39 @@ static void getSourceDatabase(SNode* pStmt, int32_t acctId, char* pDbFName) {
tNameGetFullDbName(&name, pDbFName); tNameGetFullDbName(&name, pDbFName);
} }
static int32_t addWstartTsToCreateStreamQuery(SNode* pStmt) {
SSelectStmt* pSelect = (SSelectStmt*)pStmt;
SNode* pProj = nodesListGetNode(pSelect->pProjectionList, 0);
if (NULL == pSelect->pWindow ||
(QUERY_NODE_FUNCTION == nodeType(pProj) && 0 == strcmp("_wstartts", ((SFunctionNode*)pProj)->functionName))) {
return TSDB_CODE_SUCCESS;
}
SFunctionNode* pFunc = (SFunctionNode*)nodesMakeNode(QUERY_NODE_FUNCTION);
if (NULL == pFunc) {
return TSDB_CODE_OUT_OF_MEMORY;
}
strcpy(pFunc->functionName, "_wstartts");
strcpy(pFunc->node.aliasName, pFunc->functionName);
int32_t code = nodesListPushFront(pSelect->pProjectionList, (SNode*)pFunc);
if (TSDB_CODE_SUCCESS != code) {
nodesDestroyNode((SNode*)pFunc);
}
return code;
}
static int32_t buildCreateStreamQuery(STranslateContext* pCxt, SNode* pStmt, SCMCreateStreamReq* pReq) {
pCxt->createStream = true;
int32_t code = addWstartTsToCreateStreamQuery(pStmt);
if (TSDB_CODE_SUCCESS == code) {
code = translateQuery(pCxt, pStmt);
}
if (TSDB_CODE_SUCCESS == code) {
getSourceDatabase(pStmt, pCxt->pParseCxt->acctId, pReq->sourceDB);
code = nodesNodeToString(pStmt, false, &pReq->ast, NULL);
}
return code;
}
static int32_t buildCreateStreamReq(STranslateContext* pCxt, SCreateStreamStmt* pStmt, SCMCreateStreamReq* pReq) { static int32_t buildCreateStreamReq(STranslateContext* pCxt, SCreateStreamStmt* pStmt, SCMCreateStreamReq* pReq) {
pReq->igExists = pStmt->ignoreExists; pReq->igExists = pStmt->ignoreExists;
@ -4278,13 +4385,7 @@ static int32_t buildCreateStreamReq(STranslateContext* pCxt, SCreateStreamStmt*
tNameExtractFullName(&name, pReq->targetStbFullName); tNameExtractFullName(&name, pReq->targetStbFullName);
} }
pCxt->createStream = true; int32_t code = buildCreateStreamQuery(pCxt, pStmt->pQuery, pReq);
int32_t code = translateQuery(pCxt, pStmt->pQuery);
if (TSDB_CODE_SUCCESS == code) {
getSourceDatabase(pStmt->pQuery, pCxt->pParseCxt->acctId, pReq->sourceDB);
code = nodesNodeToString(pStmt->pQuery, false, &pReq->ast, NULL);
}
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
pReq->sql = strdup(pCxt->pParseCxt->pSql); pReq->sql = strdup(pCxt->pParseCxt->pSql);
if (NULL == pReq->sql) { if (NULL == pReq->sql) {
@ -5233,7 +5334,7 @@ static int32_t buildCreateTableDataBlock(int32_t acctId, const SCreateTableStmt*
static int32_t rewriteCreateTable(STranslateContext* pCxt, SQuery* pQuery) { static int32_t rewriteCreateTable(STranslateContext* pCxt, SQuery* pQuery) {
SCreateTableStmt* pStmt = (SCreateTableStmt*)pQuery->pRoot; SCreateTableStmt* pStmt = (SCreateTableStmt*)pQuery->pRoot;
int32_t code = checkCreateTable(pCxt, pStmt); int32_t code = checkCreateTable(pCxt, pStmt, false);
SVgroupInfo info = {0}; SVgroupInfo info = {0};
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = getTableHashVgroup(pCxt, pStmt->dbName, pStmt->tableName, &info); code = getTableHashVgroup(pCxt, pStmt->dbName, pStmt->tableName, &info);
@ -5337,7 +5438,7 @@ static int32_t createTagValFromVal(STranslateContext* pCxt, SDataType targetDt,
if (NULL == *pVal) { if (NULL == *pVal) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
return DEAL_RES_ERROR == translateValueImpl(pCxt, *pVal, targetDt) ? pCxt->errCode : TSDB_CODE_SUCCESS; return DEAL_RES_ERROR == translateValueImpl(pCxt, *pVal, targetDt, true) ? pCxt->errCode : TSDB_CODE_SUCCESS;
} }
static int32_t createTagVal(STranslateContext* pCxt, uint8_t precision, SSchema* pSchema, SNode* pNode, static int32_t createTagVal(STranslateContext* pCxt, uint8_t precision, SSchema* pSchema, SNode* pNode,
@ -5721,7 +5822,7 @@ static int32_t buildUpdateTagValReq(STranslateContext* pCxt, SAlterTableStmt* pS
} }
SDataType targetDt = schemaToDataType(pTableMeta->tableInfo.precision, pSchema); SDataType targetDt = schemaToDataType(pTableMeta->tableInfo.precision, pSchema);
if (DEAL_RES_ERROR == translateValueImpl(pCxt, pStmt->pVal, targetDt)) { if (DEAL_RES_ERROR == translateValueImpl(pCxt, pStmt->pVal, targetDt, true)) {
return pCxt->errCode; return pCxt->errCode;
} }

View File

@ -159,7 +159,7 @@ void generatePerformanceSchema(MockCatalogService* mcs) {
* c4 | column | DOUBLE | 8 | * c4 | column | DOUBLE | 8 |
* c5 | column | DOUBLE | 8 | * c5 | column | DOUBLE | 8 |
*/ */
void generateTestT1(MockCatalogService* mcs) { void generateTestTables(MockCatalogService* mcs) {
ITableBuilder& builder = mcs->createTableBuilder("test", "t1", TSDB_NORMAL_TABLE, 6) ITableBuilder& builder = mcs->createTableBuilder("test", "t1", TSDB_NORMAL_TABLE, 6)
.setPrecision(TSDB_TIME_PRECISION_MILLI) .setPrecision(TSDB_TIME_PRECISION_MILLI)
.setVgid(1) .setVgid(1)
@ -183,23 +183,7 @@ void generateTestT1(MockCatalogService* mcs) {
* tag2 | tag | VARCHAR | 20 | * tag2 | tag | VARCHAR | 20 |
* tag3 | tag | TIMESTAMP | 8 | * tag3 | tag | TIMESTAMP | 8 |
* Child Table: st1s1, st1s2 * Child Table: st1s1, st1s2
*/ *
void generateTestST1(MockCatalogService* mcs) {
ITableBuilder& builder = mcs->createTableBuilder("test", "st1", TSDB_SUPER_TABLE, 3, 3)
.setPrecision(TSDB_TIME_PRECISION_MILLI)
.addColumn("ts", TSDB_DATA_TYPE_TIMESTAMP)
.addColumn("c1", TSDB_DATA_TYPE_INT)
.addColumn("c2", TSDB_DATA_TYPE_BINARY, 20)
.addTag("tag1", TSDB_DATA_TYPE_INT)
.addTag("tag2", TSDB_DATA_TYPE_BINARY, 20)
.addTag("tag3", TSDB_DATA_TYPE_TIMESTAMP);
builder.done();
mcs->createSubTable("test", "st1", "st1s1", 1);
mcs->createSubTable("test", "st1", "st1s2", 2);
mcs->createSubTable("test", "st1", "st1s3", 1);
}
/*
* Super Table: st2 * Super Table: st2
* Field | Type | DataType | Bytes | * Field | Type | DataType | Bytes |
* ========================================================================== * ==========================================================================
@ -209,16 +193,32 @@ void generateTestST1(MockCatalogService* mcs) {
* jtag | tag | json | -- | * jtag | tag | json | -- |
* Child Table: st2s1, st2s2 * Child Table: st2s1, st2s2
*/ */
void generateTestST2(MockCatalogService* mcs) { void generateTestStables(MockCatalogService* mcs) {
ITableBuilder& builder = mcs->createTableBuilder("test", "st2", TSDB_SUPER_TABLE, 3, 1) {
.setPrecision(TSDB_TIME_PRECISION_MILLI) ITableBuilder& builder = mcs->createTableBuilder("test", "st1", TSDB_SUPER_TABLE, 3, 3)
.addColumn("ts", TSDB_DATA_TYPE_TIMESTAMP) .setPrecision(TSDB_TIME_PRECISION_MILLI)
.addColumn("c1", TSDB_DATA_TYPE_INT) .addColumn("ts", TSDB_DATA_TYPE_TIMESTAMP)
.addColumn("c2", TSDB_DATA_TYPE_BINARY, 20) .addColumn("c1", TSDB_DATA_TYPE_INT)
.addTag("jtag", TSDB_DATA_TYPE_JSON); .addColumn("c2", TSDB_DATA_TYPE_BINARY, 20)
builder.done(); .addTag("tag1", TSDB_DATA_TYPE_INT)
mcs->createSubTable("test", "st2", "st2s1", 1); .addTag("tag2", TSDB_DATA_TYPE_BINARY, 20)
mcs->createSubTable("test", "st2", "st2s2", 2); .addTag("tag3", TSDB_DATA_TYPE_TIMESTAMP);
builder.done();
mcs->createSubTable("test", "st1", "st1s1", 1);
mcs->createSubTable("test", "st1", "st1s2", 2);
mcs->createSubTable("test", "st1", "st1s3", 1);
}
{
ITableBuilder& builder = mcs->createTableBuilder("test", "st2", TSDB_SUPER_TABLE, 3, 1)
.setPrecision(TSDB_TIME_PRECISION_MILLI)
.addColumn("ts", TSDB_DATA_TYPE_TIMESTAMP)
.addColumn("c1", TSDB_DATA_TYPE_INT)
.addColumn("c2", TSDB_DATA_TYPE_BINARY, 20)
.addTag("jtag", TSDB_DATA_TYPE_JSON);
builder.done();
mcs->createSubTable("test", "st2", "st2s1", 1);
mcs->createSubTable("test", "st2", "st2s2", 2);
}
} }
void generateFunctions(MockCatalogService* mcs) { void generateFunctions(MockCatalogService* mcs) {
@ -233,6 +233,11 @@ void generateDnodes(MockCatalogService* mcs) {
mcs->createDnode(3, "host3", 7030); mcs->createDnode(3, "host3", 7030);
} }
void generateDatabases(MockCatalogService* mcs) {
mcs->createDatabase("test");
mcs->createDatabase("rollup_db", true);
}
} // namespace } // namespace
int32_t __catalogGetHandle(const char* clusterId, struct SCatalog** catalogHandle) { return 0; } int32_t __catalogGetHandle(const char* clusterId, struct SCatalog** catalogHandle) { return 0; }
@ -262,7 +267,7 @@ int32_t __catalogGetDBVgInfo(SCatalog* pCtg, SRequestConnInfo* pConn, const char
} }
int32_t __catalogGetDBCfg(SCatalog* pCtg, SRequestConnInfo* pConn, const char* dbFName, SDbCfgInfo* pDbCfg) { int32_t __catalogGetDBCfg(SCatalog* pCtg, SRequestConnInfo* pConn, const char* dbFName, SDbCfgInfo* pDbCfg) {
return 0; return g_mockCatalogService->catalogGetDBCfg(dbFName, pDbCfg);
} }
int32_t __catalogChkAuth(SCatalog* pCtg, SRequestConnInfo* pConn, const char* user, const char* dbFName, AUTH_TYPE type, int32_t __catalogChkAuth(SCatalog* pCtg, SRequestConnInfo* pConn, const char* user, const char* dbFName, AUTH_TYPE type,
@ -359,11 +364,11 @@ void initMetaDataEnv() {
} }
void generateMetaData() { void generateMetaData() {
generateDatabases(g_mockCatalogService.get());
generateInformationSchema(g_mockCatalogService.get()); generateInformationSchema(g_mockCatalogService.get());
generatePerformanceSchema(g_mockCatalogService.get()); generatePerformanceSchema(g_mockCatalogService.get());
generateTestT1(g_mockCatalogService.get()); generateTestTables(g_mockCatalogService.get());
generateTestST1(g_mockCatalogService.get()); generateTestStables(g_mockCatalogService.get());
generateTestST2(g_mockCatalogService.get());
generateFunctions(g_mockCatalogService.get()); generateFunctions(g_mockCatalogService.get());
generateDnodes(g_mockCatalogService.get()); generateDnodes(g_mockCatalogService.get());
g_mockCatalogService->showTables(); g_mockCatalogService->showTables();

View File

@ -140,6 +140,17 @@ class MockCatalogServiceImpl {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t catalogGetDBCfg(const char* pDbFName, SDbCfgInfo* pDbCfg) const {
std::string dbFName(pDbFName);
DbCfgCache::const_iterator it = dbCfg_.find(dbFName.substr(std::string(pDbFName).find_last_of('.') + 1));
if (dbCfg_.end() == it) {
return TSDB_CODE_FAILED;
}
memcpy(pDbCfg, &(it->second), sizeof(SDbCfgInfo));
return TSDB_CODE_SUCCESS;
}
int32_t catalogGetUdfInfo(const std::string& funcName, SFuncInfo* pInfo) const { int32_t catalogGetUdfInfo(const std::string& funcName, SFuncInfo* pInfo) const {
auto it = udf_.find(funcName); auto it = udf_.find(funcName);
if (udf_.end() == it) { if (udf_.end() == it) {
@ -323,12 +334,21 @@ class MockCatalogServiceImpl {
dnode_.insert(std::make_pair(dnodeId, epSet)); dnode_.insert(std::make_pair(dnodeId, epSet));
} }
void createDatabase(const std::string& db, bool rollup) {
SDbCfgInfo cfg = {0};
if (rollup) {
cfg.pRetensions = taosArrayInit(TARRAY_MIN_SIZE, sizeof(SRetention));
}
dbCfg_.insert(std::make_pair(db, cfg));
}
private: private:
typedef std::map<std::string, std::shared_ptr<MockTableMeta>> TableMetaCache; typedef std::map<std::string, std::shared_ptr<MockTableMeta>> TableMetaCache;
typedef std::map<std::string, TableMetaCache> DbMetaCache; typedef std::map<std::string, TableMetaCache> DbMetaCache;
typedef std::map<std::string, std::shared_ptr<SFuncInfo>> UdfMetaCache; typedef std::map<std::string, std::shared_ptr<SFuncInfo>> UdfMetaCache;
typedef std::map<std::string, std::vector<STableIndexInfo>> IndexMetaCache; typedef std::map<std::string, std::vector<STableIndexInfo>> IndexMetaCache;
typedef std::map<int32_t, SEpSet> DnodeCache; typedef std::map<int32_t, SEpSet> DnodeCache;
typedef std::map<std::string, SDbCfgInfo> DbCfgCache;
uint64_t getNextId() { return id_++; } uint64_t getNextId() { return id_++; }
@ -486,6 +506,7 @@ class MockCatalogServiceImpl {
for (int32_t i = 0; i < ndbs; ++i) { for (int32_t i = 0; i < ndbs; ++i) {
SMetaRes res = {0}; SMetaRes res = {0};
res.pRes = taosMemoryCalloc(1, sizeof(SDbCfgInfo)); res.pRes = taosMemoryCalloc(1, sizeof(SDbCfgInfo));
res.code = catalogGetDBCfg((const char*)taosArrayGet(pDbCfgReq, i), (SDbCfgInfo*)res.pRes);
taosArrayPush(*pDbCfgData, &res); taosArrayPush(*pDbCfgData, &res);
} }
} }
@ -576,6 +597,7 @@ class MockCatalogServiceImpl {
UdfMetaCache udf_; UdfMetaCache udf_;
IndexMetaCache index_; IndexMetaCache index_;
DnodeCache dnode_; DnodeCache dnode_;
DbCfgCache dbCfg_;
}; };
MockCatalogService::MockCatalogService() : impl_(new MockCatalogServiceImpl()) {} MockCatalogService::MockCatalogService() : impl_(new MockCatalogServiceImpl()) {}
@ -605,6 +627,8 @@ void MockCatalogService::createDnode(int32_t dnodeId, const std::string& host, i
impl_->createDnode(dnodeId, host, port); impl_->createDnode(dnodeId, host, port);
} }
void MockCatalogService::createDatabase(const std::string& db, bool rollup) { impl_->createDatabase(db, rollup); }
int32_t MockCatalogService::catalogGetTableMeta(const SName* pTableName, STableMeta** pTableMeta) const { int32_t MockCatalogService::catalogGetTableMeta(const SName* pTableName, STableMeta** pTableMeta) const {
return impl_->catalogGetTableMeta(pTableName, pTableMeta); return impl_->catalogGetTableMeta(pTableName, pTableMeta);
} }
@ -621,6 +645,10 @@ int32_t MockCatalogService::catalogGetDBVgInfo(const char* pDbFName, SArray** pV
return impl_->catalogGetDBVgInfo(pDbFName, pVgList); return impl_->catalogGetDBVgInfo(pDbFName, pVgList);
} }
int32_t MockCatalogService::catalogGetDBCfg(const char* pDbFName, SDbCfgInfo* pDbCfg) const {
return impl_->catalogGetDBCfg(pDbFName, pDbCfg);
}
int32_t MockCatalogService::catalogGetUdfInfo(const std::string& funcName, SFuncInfo* pInfo) const { int32_t MockCatalogService::catalogGetUdfInfo(const std::string& funcName, SFuncInfo* pInfo) const {
return impl_->catalogGetUdfInfo(funcName, pInfo); return impl_->catalogGetUdfInfo(funcName, pInfo);
} }

View File

@ -63,11 +63,13 @@ class MockCatalogService {
void createFunction(const std::string& func, int8_t funcType, int8_t outputType, int32_t outputLen, int32_t bufSize); void createFunction(const std::string& func, int8_t funcType, int8_t outputType, int32_t outputLen, int32_t bufSize);
void createSmaIndex(const SMCreateSmaReq* pReq); void createSmaIndex(const SMCreateSmaReq* pReq);
void createDnode(int32_t dnodeId, const std::string& host, int16_t port); void createDnode(int32_t dnodeId, const std::string& host, int16_t port);
void createDatabase(const std::string& db, bool rollup = false);
int32_t catalogGetTableMeta(const SName* pTableName, STableMeta** pTableMeta) const; int32_t catalogGetTableMeta(const SName* pTableName, STableMeta** pTableMeta) const;
int32_t catalogGetTableHashVgroup(const SName* pTableName, SVgroupInfo* vgInfo) const; int32_t catalogGetTableHashVgroup(const SName* pTableName, SVgroupInfo* vgInfo) const;
int32_t catalogGetTableDistVgInfo(const SName* pTableName, SArray** pVgList) const; int32_t catalogGetTableDistVgInfo(const SName* pTableName, SArray** pVgList) const;
int32_t catalogGetDBVgInfo(const char* pDbFName, SArray** pVgList) const; int32_t catalogGetDBVgInfo(const char* pDbFName, SArray** pVgList) const;
int32_t catalogGetDBCfg(const char* pDbFName, SDbCfgInfo* pDbCfg) const;
int32_t catalogGetUdfInfo(const std::string& funcName, SFuncInfo* pInfo) const; int32_t catalogGetUdfInfo(const std::string& funcName, SFuncInfo* pInfo) const;
int32_t catalogGetTableIndex(const SName* pTableName, SArray** pIndexes) const; int32_t catalogGetTableIndex(const SName* pTableName, SArray** pIndexes) const;
int32_t catalogGetDnodeList(SArray** pDnodes) const; int32_t catalogGetDnodeList(SArray** pDnodes) const;

View File

@ -38,9 +38,9 @@ TEST_F(ParserInitialATest, alterDnode) {
TEST_F(ParserInitialATest, alterDatabase) { TEST_F(ParserInitialATest, alterDatabase) {
useDb("root", "test"); useDb("root", "test");
run("ALTER DATABASE wxy_db CACHELAST 1 FSYNC 200 WAL 1"); run("ALTER DATABASE test CACHELAST 1 FSYNC 200 WAL 1");
run("ALTER DATABASE wxy_db KEEP 2400"); run("ALTER DATABASE test KEEP 2400");
} }
TEST_F(ParserInitialATest, alterLocal) { TEST_F(ParserInitialATest, alterLocal) {

View File

@ -359,11 +359,11 @@ TEST_F(ParserInitialCTest, createStable) {
memset(&expect, 0, sizeof(SMCreateStbReq)); memset(&expect, 0, sizeof(SMCreateStbReq));
}; };
auto setCreateStbReqFunc = [&](const char* pTbname, int8_t igExists = 0, int64_t delay1 = -1, int64_t delay2 = -1, auto setCreateStbReqFunc = [&](const char* pDbName, const char* pTbName, int8_t igExists = 0, int64_t delay1 = -1,
int64_t watermark1 = TSDB_DEFAULT_ROLLUP_WATERMARK, int64_t delay2 = -1, int64_t watermark1 = TSDB_DEFAULT_ROLLUP_WATERMARK,
int64_t watermark2 = TSDB_DEFAULT_ROLLUP_WATERMARK, int64_t watermark2 = TSDB_DEFAULT_ROLLUP_WATERMARK,
int32_t ttl = TSDB_DEFAULT_TABLE_TTL, const char* pComment = nullptr) { int32_t ttl = TSDB_DEFAULT_TABLE_TTL, const char* pComment = nullptr) {
int32_t len = snprintf(expect.name, sizeof(expect.name), "0.test.%s", pTbname); int32_t len = snprintf(expect.name, sizeof(expect.name), "0.%s.%s", pDbName, pTbName);
expect.name[len] = '\0'; expect.name[len] = '\0';
expect.igExists = igExists; expect.igExists = igExists;
expect.delay1 = delay1; expect.delay1 = delay1;
@ -454,14 +454,14 @@ TEST_F(ParserInitialCTest, createStable) {
tFreeSMCreateStbReq(&req); tFreeSMCreateStbReq(&req);
}); });
setCreateStbReqFunc("t1"); setCreateStbReqFunc("test", "t1");
addFieldToCreateStbReqFunc(true, "ts", TSDB_DATA_TYPE_TIMESTAMP); addFieldToCreateStbReqFunc(true, "ts", TSDB_DATA_TYPE_TIMESTAMP);
addFieldToCreateStbReqFunc(true, "c1", TSDB_DATA_TYPE_INT); addFieldToCreateStbReqFunc(true, "c1", TSDB_DATA_TYPE_INT);
addFieldToCreateStbReqFunc(false, "id", TSDB_DATA_TYPE_INT); addFieldToCreateStbReqFunc(false, "id", TSDB_DATA_TYPE_INT);
run("CREATE STABLE t1(ts TIMESTAMP, c1 INT) TAGS(id INT)"); run("CREATE STABLE t1(ts TIMESTAMP, c1 INT) TAGS(id INT)");
clearCreateStbReq(); clearCreateStbReq();
setCreateStbReqFunc("t1", 1, 100 * MILLISECOND_PER_SECOND, 10 * MILLISECOND_PER_MINUTE, 10, setCreateStbReqFunc("rollup_db", "t1", 1, 100 * MILLISECOND_PER_SECOND, 10 * MILLISECOND_PER_MINUTE, 10,
1 * MILLISECOND_PER_MINUTE, 100, "test create table"); 1 * MILLISECOND_PER_MINUTE, 100, "test create table");
addFieldToCreateStbReqFunc(true, "ts", TSDB_DATA_TYPE_TIMESTAMP, 0, 0); addFieldToCreateStbReqFunc(true, "ts", TSDB_DATA_TYPE_TIMESTAMP, 0, 0);
addFieldToCreateStbReqFunc(true, "c1", TSDB_DATA_TYPE_INT); addFieldToCreateStbReqFunc(true, "c1", TSDB_DATA_TYPE_INT);
@ -493,7 +493,7 @@ TEST_F(ParserInitialCTest, createStable) {
addFieldToCreateStbReqFunc(false, "a13", TSDB_DATA_TYPE_BOOL); addFieldToCreateStbReqFunc(false, "a13", TSDB_DATA_TYPE_BOOL);
addFieldToCreateStbReqFunc(false, "a14", TSDB_DATA_TYPE_NCHAR, 30 * TSDB_NCHAR_SIZE + VARSTR_HEADER_SIZE); addFieldToCreateStbReqFunc(false, "a14", TSDB_DATA_TYPE_NCHAR, 30 * TSDB_NCHAR_SIZE + VARSTR_HEADER_SIZE);
addFieldToCreateStbReqFunc(false, "a15", TSDB_DATA_TYPE_VARCHAR, 50 + VARSTR_HEADER_SIZE); addFieldToCreateStbReqFunc(false, "a15", TSDB_DATA_TYPE_VARCHAR, 50 + VARSTR_HEADER_SIZE);
run("CREATE STABLE IF NOT EXISTS test.t1(" run("CREATE STABLE IF NOT EXISTS rollup_db.t1("
"ts TIMESTAMP, c1 INT, c2 INT UNSIGNED, c3 BIGINT, c4 BIGINT UNSIGNED, c5 FLOAT, c6 DOUBLE, c7 BINARY(20), " "ts TIMESTAMP, c1 INT, c2 INT UNSIGNED, c3 BIGINT, c4 BIGINT UNSIGNED, c5 FLOAT, c6 DOUBLE, c7 BINARY(20), "
"c8 SMALLINT, c9 SMALLINT UNSIGNED COMMENT 'test column comment', c10 TINYINT, c11 TINYINT UNSIGNED, c12 BOOL, " "c8 SMALLINT, c9 SMALLINT UNSIGNED COMMENT 'test column comment', c10 TINYINT, c11 TINYINT UNSIGNED, c12 BOOL, "
"c13 NCHAR(30), c14 VARCHAR(50)) " "c13 NCHAR(30), c14 VARCHAR(50)) "
@ -507,12 +507,13 @@ TEST_F(ParserInitialCTest, createStable) {
TEST_F(ParserInitialCTest, createStableSemanticCheck) { TEST_F(ParserInitialCTest, createStableSemanticCheck) {
useDb("root", "test"); useDb("root", "test");
run("CREATE STABLE stb2 (ts TIMESTAMP, c1 INT) TAGS (tag1 INT) ROLLUP(CEIL)", TSDB_CODE_PAR_INVALID_ROLLUP_OPTION); run("CREATE STABLE rollup_db.stb2 (ts TIMESTAMP, c1 INT) TAGS (tag1 INT) ROLLUP(CEIL)",
TSDB_CODE_PAR_INVALID_ROLLUP_OPTION);
run("CREATE STABLE stb2 (ts TIMESTAMP, c1 INT) TAGS (tag1 INT) ROLLUP(MAX) MAX_DELAY 0s WATERMARK 1m", run("CREATE STABLE rollup_db.stb2 (ts TIMESTAMP, c1 INT) TAGS (tag1 INT) ROLLUP(MAX) MAX_DELAY 0s WATERMARK 1m",
TSDB_CODE_PAR_INVALID_RANGE_OPTION); TSDB_CODE_PAR_INVALID_RANGE_OPTION);
run("CREATE STABLE stb2 (ts TIMESTAMP, c1 INT) TAGS (tag1 INT) ROLLUP(MAX) MAX_DELAY 10s WATERMARK 18m", run("CREATE STABLE rollup_db.stb2 (ts TIMESTAMP, c1 INT) TAGS (tag1 INT) ROLLUP(MAX) MAX_DELAY 10s WATERMARK 18m",
TSDB_CODE_PAR_INVALID_RANGE_OPTION); TSDB_CODE_PAR_INVALID_RANGE_OPTION);
} }
@ -561,30 +562,33 @@ TEST_F(ParserInitialCTest, createStream) {
tFreeSCMCreateStreamReq(&req); tFreeSCMCreateStreamReq(&req);
}); });
setCreateStreamReqFunc("s1", "test", "create stream s1 as select * from t1"); setCreateStreamReqFunc("s1", "test", "create stream s1 as select count(*) from t1 interval(10s)");
run("CREATE STREAM s1 AS SELECT * FROM t1"); run("CREATE STREAM s1 AS SELECT COUNT(*) FROM t1 INTERVAL(10S)");
clearCreateStreamReq(); clearCreateStreamReq();
setCreateStreamReqFunc("s1", "test", "create stream if not exists s1 as select * from t1", nullptr, 1); setCreateStreamReqFunc("s1", "test", "create stream if not exists s1 as select count(*) from t1 interval(10s)",
run("CREATE STREAM IF NOT EXISTS s1 AS SELECT * FROM t1"); nullptr, 1);
run("CREATE STREAM IF NOT EXISTS s1 AS SELECT COUNT(*) FROM t1 INTERVAL(10S)");
clearCreateStreamReq(); clearCreateStreamReq();
setCreateStreamReqFunc("s1", "test", "create stream s1 into st1 as select * from t1", "st1"); setCreateStreamReqFunc("s1", "test", "create stream s1 into st1 as select count(*) from t1 interval(10s)", "st1");
run("CREATE STREAM s1 INTO st1 AS SELECT * FROM t1"); run("CREATE STREAM s1 INTO st1 AS SELECT COUNT(*) FROM t1 INTERVAL(10S)");
clearCreateStreamReq(); clearCreateStreamReq();
setCreateStreamReqFunc( setCreateStreamReqFunc("s1", "test",
"s1", "test", "create stream if not exists s1 trigger max_delay 20s watermark 10s ignore expired into st1 "
"create stream if not exists s1 trigger max_delay 20s watermark 10s ignore expired into st1 as select * from t1", "as select count(*) from t1 interval(10s)",
"st1", 1, STREAM_TRIGGER_MAX_DELAY, 20 * MILLISECOND_PER_SECOND, 10 * MILLISECOND_PER_SECOND, 1); "st1", 1, STREAM_TRIGGER_MAX_DELAY, 20 * MILLISECOND_PER_SECOND, 10 * MILLISECOND_PER_SECOND,
run("CREATE STREAM IF NOT EXISTS s1 TRIGGER MAX_DELAY 20s WATERMARK 10s IGNORE EXPIRED INTO st1 AS SELECT * FROM t1"); 1);
run("CREATE STREAM IF NOT EXISTS s1 TRIGGER MAX_DELAY 20s WATERMARK 10s IGNORE EXPIRED INTO st1 AS SELECT COUNT(*) "
"FROM t1 INTERVAL(10S)");
clearCreateStreamReq(); clearCreateStreamReq();
} }
TEST_F(ParserInitialCTest, createStreamSemanticCheck) { TEST_F(ParserInitialCTest, createStreamSemanticCheck) {
useDb("root", "test"); useDb("root", "test");
run("CREATE STREAM s1 AS SELECT PERCENTILE(c1, 30) FROM t1", TSDB_CODE_PAR_STREAM_NOT_ALLOWED_FUNC); run("CREATE STREAM s1 AS SELECT PERCENTILE(c1, 30) FROM t1 INTERVAL(10S)", TSDB_CODE_PAR_STREAM_NOT_ALLOWED_FUNC);
} }
TEST_F(ParserInitialCTest, createTable) { TEST_F(ParserInitialCTest, createTable) {
@ -598,7 +602,7 @@ TEST_F(ParserInitialCTest, createTable) {
"c13 NCHAR(30), c15 VARCHAR(50)) " "c13 NCHAR(30), c15 VARCHAR(50)) "
"TTL 100 COMMENT 'test create table' SMA(c1, c2, c3)"); "TTL 100 COMMENT 'test create table' SMA(c1, c2, c3)");
run("CREATE TABLE IF NOT EXISTS test.t1(" run("CREATE TABLE IF NOT EXISTS rollup_db.t1("
"ts TIMESTAMP, c1 INT, c2 INT UNSIGNED, c3 BIGINT, c4 BIGINT UNSIGNED, c5 FLOAT, c6 DOUBLE, c7 BINARY(20), " "ts TIMESTAMP, c1 INT, c2 INT UNSIGNED, c3 BIGINT, c4 BIGINT UNSIGNED, c5 FLOAT, c6 DOUBLE, c7 BINARY(20), "
"c8 SMALLINT, c9 SMALLINT UNSIGNED COMMENT 'test column comment', c10 TINYINT, c11 TINYINT UNSIGNED, c12 BOOL, " "c8 SMALLINT, c9 SMALLINT UNSIGNED COMMENT 'test column comment', c10 TINYINT, c11 TINYINT UNSIGNED, c12 BOOL, "
"c13 NCHAR(30), c14 VARCHAR(50)) " "c13 NCHAR(30), c14 VARCHAR(50)) "
@ -617,6 +621,21 @@ TEST_F(ParserInitialCTest, createTable) {
// run("CREATE TABLE IF NOT EXISTS t1 USING st1 TAGS(1, 'wxy', NOW + 1S)"); // run("CREATE TABLE IF NOT EXISTS t1 USING st1 TAGS(1, 'wxy', NOW + 1S)");
} }
TEST_F(ParserInitialCTest, createTableSemanticCheck) {
useDb("root", "test");
string sql = "CREATE TABLE st1(ts TIMESTAMP, ";
for (int32_t i = 1; i < 4096; ++i) {
if (i > 1) {
sql.append(", ");
}
sql.append("c" + to_string(i) + " INT");
}
sql.append(") TAGS (t1 int)");
run(sql, TSDB_CODE_PAR_TOO_MANY_COLUMNS);
}
TEST_F(ParserInitialCTest, createTopic) { TEST_F(ParserInitialCTest, createTopic) {
useDb("root", "test"); useDb("root", "test");

View File

@ -1220,6 +1220,7 @@ static int32_t qnodeSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
if (!splMatch(pCxt, pSubplan, 0, (FSplFindSplitNode)qndSplFindSplitNode, &info)) { if (!splMatch(pCxt, pSubplan, 0, (FSplFindSplitNode)qndSplFindSplitNode, &info)) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
((SScanLogicNode*)info.pSplitNode)->dataRequired = FUNC_DATA_REQUIRED_DATA_LOAD;
int32_t code = splCreateExchangeNodeForSubplan(pCxt, info.pSubplan, info.pSplitNode, info.pSubplan->subplanType); int32_t code = splCreateExchangeNodeForSubplan(pCxt, info.pSubplan, info.pSplitNode, info.pSubplan->subplanType);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
SLogicSubplan* pScanSubplan = splCreateScanSubplan(pCxt, info.pSplitNode, 0); SLogicSubplan* pScanSubplan = splCreateScanSubplan(pCxt, info.pSplitNode, 0);

View File

@ -65,6 +65,7 @@ int32_t syncInit() {
syncCleanUp(); syncCleanUp();
ret = -1; ret = -1;
} else { } else {
sDebug("sync rsetId:%" PRId64 " is open", tsNodeRefId);
ret = syncEnvStart(); ret = syncEnvStart();
} }
} }
@ -77,6 +78,7 @@ void syncCleanUp() {
ASSERT(ret == 0); ASSERT(ret == 0);
if (tsNodeRefId != -1) { if (tsNodeRefId != -1) {
sDebug("sync rsetId:%" PRId64 " is closed", tsNodeRefId);
taosCloseRef(tsNodeRefId); taosCloseRef(tsNodeRefId);
tsNodeRefId = -1; tsNodeRefId = -1;
} }
@ -96,6 +98,7 @@ int64_t syncOpen(const SSyncInfo* pSyncInfo) {
return -1; return -1;
} }
sDebug("vgId:%d, rid:%" PRId64 " is added to rsetId:%" PRId64, pSyncInfo->vgId, pSyncNode->rid, tsNodeRefId);
return pSyncNode->rid; return pSyncNode->rid;
} }
@ -136,13 +139,14 @@ void syncStartStandBy(int64_t rid) {
void syncStop(int64_t rid) { void syncStop(int64_t rid) {
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
if (pSyncNode == NULL) { if (pSyncNode == NULL) return;
return;
} int32_t vgId = pSyncNode->vgId;
syncNodeClose(pSyncNode); syncNodeClose(pSyncNode);
taosReleaseRef(tsNodeRefId, pSyncNode->rid); taosReleaseRef(tsNodeRefId, pSyncNode->rid);
taosRemoveRef(tsNodeRefId, rid); taosRemoveRef(tsNodeRefId, rid);
sDebug("vgId:%d, rid:%" PRId64 " is removed from rsetId:%" PRId64, vgId, rid, tsNodeRefId);
} }
int32_t syncSetStandby(int64_t rid) { int32_t syncSetStandby(int64_t rid) {

View File

@ -164,6 +164,7 @@
./test.sh -f tsim/sma/drop_sma.sim ./test.sh -f tsim/sma/drop_sma.sim
./test.sh -f tsim/sma/tsmaCreateInsertQuery.sim ./test.sh -f tsim/sma/tsmaCreateInsertQuery.sim
./test.sh -f tsim/sma/rsmaCreateInsertQuery.sim ./test.sh -f tsim/sma/rsmaCreateInsertQuery.sim
./test.sh -f tsim/sma/rsmaPersistenceRecovery.sim
# --- valgrind # --- valgrind
./test.sh -f tsim/valgrind/checkError.sim -v ./test.sh -f tsim/valgrind/checkError.sim -v

View File

@ -0,0 +1,237 @@
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start
sleep 50
sql connect
print =============== create database with retentions
sql create database d0 retentions 5s:7d,5m:21d,15m:365d;
sql use d0
print =============== create super table and register rsma
sql create table if not exists stb (ts timestamp, c1 int, c2 float) tags (city binary(20),district binary(20)) rollup(max) max_delay 5s,5s watermark 2s,3s;
sql show stables
if $rows != 1 then
return -1
endi
print =============== create child table
sql create table ct1 using stb tags("BeiJing", "ChaoYang");
sql show tables
if $rows != 1 then
return -1
endi
print =============== insert data and trigger rollup
sql insert into ct1 values(now, 10, 10.0);
sql insert into ct1 values(now+1s, 1, 1.0);
sql insert into ct1 values(now+2s, 100, 100.0);
print =============== wait maxdelay 5+1 seconds for results
sleep 6000
print =============== select * from retention level 2 from memory
sql select * from ct1;
print $data00 $data01 $data02
if $rows > 2 then
print retention level 2 file rows $rows > 2
return -1
endi
if $data01 != 100 then
if $data01 != 10 then
print retention level 2 file result $data01 != 100 or 10
return -1
endi
endi
print =============== select * from retention level 1 from memory
sql select * from ct1 where ts > now-8d;
print $data00 $data01 $data02
if $rows > 2 then
print retention level 1 file rows $rows > 2
return -1
endi
if $data01 != 100 then
if $data01 != 10 then
print retention level 1 file result $data01 != 100 or 10
return -1
endi
endi
print =============== select * from retention level 0 from memory
sql select * from ct1 where ts > now-3d;
print $data00 $data01 $data02
print $data10 $data11 $data12
print $data20 $data21 $data22
if $rows < 1 then
print retention level 0 file rows $rows < 1
return -1
endi
if $data01 != 10 then
print retention level 0 file result $data01 != 10
return -1
endi
#===================================================================
#==================== reboot to trigger commit data to file
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode1 -s start
print =============== select * from retention level 2 from file
sql select * from ct1;
print $data00 $data01 $data02
if $rows > 2 then
print retention level 2 file rows $rows > 2
return -1
endi
if $data01 != 100 then
if $data01 != 10 then
print retention level 2 file result $data01 != 100 or 10
return -1
endi
endi
print =============== select * from retention level 1 from file
sql select * from ct1 where ts > now-8d;
print $data00 $data01 $data02
if $rows > 2 then
print retention level 1 file rows $rows > 2
return -1
endi
if $data01 != 100 then
if $data01 != 10 then
print retention level 1 file result $data01 != 100 or 10
return -1
endi
endi
print =============== select * from retention level 0 from file
sql select * from ct1 where ts > now-3d;
print $data00 $data01 $data02
print $data10 $data11 $data12
print $data20 $data21 $data22
if $rows < 1 then
print retention level 0 file rows $rows < 1
return -1
endi
if $data01 != 10 then
print retention level 0 file result $data01 != 10
return -1
endi
print =============== insert after rsma qtaskinfo recovery
sql insert into ct1 values(now, 50, 500.0);
sql insert into ct1 values(now+1s, 40, 40.0);
print =============== wait maxdelay 5+1 seconds for results
sleep 6000
print =============== select * from retention level 2 from file and memory after rsma qtaskinfo recovery
sql select * from ct1;
print $data00 $data01 $data02
if $rows > 2 then
print retention level 2 file/mem rows $rows > 2
return -1
endi
if $data01 != 100 then
if $data01 != 10 then
print retention level 2 file/mem result $data01 != 100 or 10
return -1
endi
endi
if $data02 != 500.00000 then
if $data02 != 100.00000 then
print retention level 1 file/mem result $data02 != 500.00000 or 100.00000
return -1
endi
endi
print =============== select * from retention level 1 from file and memory after rsma qtaskinfo recovery
sql select * from ct1 where ts > now-8d;
print $data00 $data01 $data02
if $rows > 2 then
print retention level 1 file/mem rows $rows > 2
return -1
endi
if $data01 != 100 then
if $data01 != 10 then
print retention level 1 file/mem result $data01 != 100 or 10
return -1
endi
endi
if $data02 != 500.00000 then
if $data02 != 100.00000 then
print retention level 1 file/mem result $data02 != 500.00000 or 100.00000
return -1
endi
endi
print =============== select * from retention level 0 from file and memory after rsma qtaskinfo recovery
sql select * from ct1 where ts > now-3d;
print $data00 $data01 $data02
print $data10 $data11 $data12
print $data20 $data21 $data22
print $data30 $data31 $data32
print $data40 $data41 $data42
if $rows < 1 then
print retention level 0 file/mem rows $rows < 1
return -1
endi
if $data01 != 10 then
print retention level 0 file/mem result $data01 != 10
return -1
endi
if $data11 != 1 then
print retention level 0 file/mem result $data11 != 1
return -1
endi
if $data21 != 100 then
print retention level 0 file/mem result $data21 != 100
return -1
endi
if $data31 != 50 then
print retention level 0 file/mem result $data31 != 50
return -1
endi
if $data32 != 500.00000 then
print retention level 0 file/mem result $data32 != 500.00000
return -1
endi
if $data41 != 40 then
print retention level 0 file/mem result $data41 != 40
return -1
endi
if $data42 != 40.00000 then
print retention level 0 file/mem result $data42 != 40.00000
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT

View File

@ -139,7 +139,7 @@ class TDTestCase:
) )
for i in range(4): for i in range(4):
tdSql.execute( tdSql.execute(
f'create table ct{i+1} using stb1 tags ( now(), {1*i}, {11111*i}, {111*i}, {11*i}, {1.11*i}, {11.11*i}, {i%2}, "binary{i}", "nchar{i}" )') f'create table ct{i+1} using stb1 tags ( now(), {1*i}, {11111*i}, {111*i}, {1*i}, {1.11*i}, {11.11*i}, {i%2}, "binary{i}", "nchar{i}" )')
for i in range(9): for i in range(9):
tdSql.execute( tdSql.execute(

View File

@ -36,7 +36,7 @@ class TDTestCase:
''' '''
) )
for i in range(20): for i in range(20):
tdSql.execute(f'create table ct{i+1} using stb1 tags ( now(), {1*i}, {11111*i}, {111*i}, {11*i}, {1.11*i}, {11.11*i}, {i%2}, "binary{i}", "nchar{i}" )') tdSql.execute(f'create table ct{i+1} using stb1 tags ( now(), {1*i}, {11111*i}, {111*i}, {1*i}, {1.11*i}, {11.11*i}, {i%2}, "binary{i}", "nchar{i}" )')
for i in range(9): for i in range(9):
tdSql.execute( tdSql.execute(

View File

@ -53,7 +53,7 @@ class TDTestCase:
''' '''
) )
for i in range(20): for i in range(20):
tdSql.execute(f'create table ct{i+1} using stb1 tags ( now(), {1*i}, {11111*i}, {111*i}, {11*i}, {1.11*i}, {11.11*i}, {i%2}, "binary{i}", "nchar{i}" )') tdSql.execute(f'create table ct{i+1} using stb1 tags ( now(), {1*i}, {11111*i}, {111*i}, {1*i}, {1.11*i}, {11.11*i}, {i%2}, "binary{i}", "nchar{i}" )')
for i in range(9): for i in range(9):
tdSql.execute( tdSql.execute(

View File

@ -55,7 +55,7 @@ class TDTestCase:
''' '''
) )
for i in range(20): for i in range(20):
tdSql.execute(f'create table ct{i+1} using stb1 tags ( now(), {1*i}, {11111*i}, {111*i}, {11*i}, {1.11*i}, {11.11*i}, {i%2}, "binary{i}", "nchar{i}" )') tdSql.execute(f'create table ct{i+1} using stb1 tags ( now(), {1*i}, {11111*i}, {111*i}, {1*i}, {1.11*i}, {11.11*i}, {i%2}, "binary{i}", "nchar{i}" )')
for i in range(9): for i in range(9):
tdSql.execute( tdSql.execute(

View File

@ -55,7 +55,7 @@ class TDTestCase:
''' '''
) )
for i in range(20): for i in range(20):
tdSql.execute(f'create table ct{i+1} using stb1 tags ( now(), {1*i}, {11111*i}, {111*i}, {11*i}, {1.11*i}, {11.11*i}, {i%2}, "binary{i}", "nchar{i}" )') tdSql.execute(f'create table ct{i+1} using stb1 tags ( now(), {1*i}, {11111*i}, {111*i}, {1*i}, {1.11*i}, {11.11*i}, {i%2}, "binary{i}", "nchar{i}" )')
for i in range(9): for i in range(9):
tdSql.execute( tdSql.execute(

View File

@ -55,7 +55,7 @@ class TDTestCase:
''' '''
) )
for i in range(20): for i in range(20):
tdSql.execute(f'create table ct{i+1} using stb1 tags ( now(), {1*i}, {11111*i}, {111*i}, {11*i}, {1.11*i}, {11.11*i}, {i%2}, "binary{i}", "nchar{i}" )') tdSql.execute(f'create table ct{i+1} using stb1 tags ( now(), {1*i}, {11111*i}, {111*i}, {1*i}, {1.11*i}, {11.11*i}, {i%2}, "binary{i}", "nchar{i}" )')
for i in range(9): for i in range(9):
tdSql.execute( tdSql.execute(

View File

@ -55,7 +55,7 @@ class TDTestCase:
''' '''
) )
for i in range(20): for i in range(20):
tdSql.execute(f'create table ct{i+1} using stb1 tags ( now(), {1*i}, {11111*i}, {111*i}, {11*i}, {1.11*i}, {11.11*i}, {i%2}, "binary{i}", "nchar{i}" )') tdSql.execute(f'create table ct{i+1} using stb1 tags ( now(), {1*i}, {11111*i}, {111*i}, {1*i}, {1.11*i}, {11.11*i}, {i%2}, "binary{i}", "nchar{i}" )')
for i in range(9): for i in range(9):
tdSql.execute( tdSql.execute(

View File

@ -64,7 +64,7 @@ class TDTestCase:
''' '''
) )
for i in range(20): for i in range(20):
tdSql.execute(f'create table ct{i+1} using stb1 tags ( now(), {1*i}, {11111*i}, {111*i}, {11*i}, {1.11*i}, {11.11*i}, {i%2}, "binary{i}", "nchar{i}" )') tdSql.execute(f'create table ct{i+1} using stb1 tags ( now(), {1*i}, {11111*i}, {111*i}, {1*i}, {1.11*i}, {11.11*i}, {i%2}, "binary{i}", "nchar{i}" )')
for i in range(9): for i in range(9):
tdSql.execute( tdSql.execute(

View File

@ -53,7 +53,7 @@ class TDTestCase:
''' '''
) )
for i in range(20): for i in range(20):
tdSql.execute(f'create table ct{i+1} using stb1 tags ( now(), {1*i}, {11111*i}, {111*i}, {11*i}, {1.11*i}, {11.11*i}, {i%2}, "binary{i}", "nchar{i}" )') tdSql.execute(f'create table ct{i+1} using stb1 tags ( now(), {1*i}, {11111*i}, {111*i}, {1*i}, {1.11*i}, {11.11*i}, {i%2}, "binary{i}", "nchar{i}" )')
for i in range(9): for i in range(9):
tdSql.execute( tdSql.execute(

View File

@ -42,7 +42,7 @@ class TDTestCase:
) )
for i in range(4): for i in range(4):
tdSql.execute( tdSql.execute(
f'create table ct{i+1} using stb1 tags ( now(), {1*i}, {11111*i}, {111*i}, {11*i}, {1.11*i}, {11.11*i}, {i%2}, "binary{i}", "nchar{i}" )') f'create table ct{i+1} using stb1 tags ( now(), {1*i}, {11111*i}, {111*i}, {1*i}, {1.11*i}, {11.11*i}, {i%2}, "binary{i}", "nchar{i}" )')
for i in range(9): for i in range(9):
tdSql.execute( tdSql.execute(

View File

@ -97,7 +97,7 @@ class TDTestCase:
) )
for i in range(4): for i in range(4):
tdSql.execute( tdSql.execute(
f'create table ct{i+1} using stb1 tags ( now(), {1*i}, {11111*i}, {111*i}, {11*i}, {1.11*i}, {11.11*i}, {i%2}, "binary{i}", "nchar{i}" )') f'create table ct{i+1} using stb1 tags ( now(), {1*i}, {11111*i}, {111*i}, {1*i}, {1.11*i}, {11.11*i}, {i%2}, "binary{i}", "nchar{i}" )')
for i in range(9): for i in range(9):
tdSql.execute( tdSql.execute(

View File

@ -687,8 +687,8 @@ class TDTestCase:
# to_json() # to_json()
tdSql.query("select to_json('{\"abc\":123}') from jsons1_1") tdSql.query("select to_json('{\"abc\":123}') from jsons1_1")
tdSql.checkRows(2) tdSql.checkRows(2)
# tdSql.checkData(0, 0, '{"abc":123}') tdSql.checkData(0, 0, '{"abc":123}')
# tdSql.checkData(1, 0, '{"abc":123}') tdSql.checkData(1, 0, '{"abc":123}')
tdSql.query("select to_json('null') from jsons1_1") tdSql.query("select to_json('null') from jsons1_1")
tdSql.checkRows(2) tdSql.checkRows(2)
tdSql.checkData(0, 0, 'null') tdSql.checkData(0, 0, 'null')

View File

@ -109,7 +109,7 @@ class TDTestCase:
''' '''
) )
for i in range(20): for i in range(20):
tdSql.execute(f'create table ct{i+1} using stb1 tags ( now(), {1*i}, {11111*i}, {111*i}, {11*i}, {1.11*i}, {11.11*i}, {i%2}, "binary{i}", "nchar{i}" )') tdSql.execute(f'create table ct{i+1} using stb1 tags ( now(), {1*i}, {11111*i}, {111*i}, {1*i}, {1.11*i}, {11.11*i}, {i%2}, "binary{i}", "nchar{i}" )')
for i in range(9): for i in range(9):
tdSql.execute( tdSql.execute(

View File

@ -34,7 +34,7 @@ class TDTestCase:
) )
for i in range(self.tb_nums): for i in range(self.tb_nums):
tdSql.execute(f'create table ct{i+1} using stb1 tags ( now(), {1*i}, {11111*i}, {111*i}, {11*i}, {1.11*i}, {11.11*i}, {i%2}, "binary{i}", "nchar{i}" )') tdSql.execute(f'create table ct{i+1} using stb1 tags ( now(), {1*i}, {11111*i}, {111*i}, {1*i}, {1.11*i}, {11.11*i}, {i%2}, "binary{i}", "nchar{i}" )')
ts = self.ts ts = self.ts
for j in range(self.row_nums): for j in range(self.row_nums):
ts+=j*self.time_step ts+=j*self.time_step

@ -1 +1 @@
Subproject commit c885e967e490105999b84d009a15168728dfafaf Subproject commit c3815951fc80617ecd171f3743b8b4a4d0bc712e