Merge branch 'main' of github.com:taosdata/TDengine into szhou/fix-ts3400
This commit is contained in:
commit
51fa0aeb64
|
@ -352,4 +352,4 @@ TDengine 提供了丰富的应用程序开发接口,其中包括 C/C++、Java
|
||||||
|
|
||||||
# 加入技术交流群
|
# 加入技术交流群
|
||||||
|
|
||||||
TDengine 官方社群「物联网大数据群」对外开放,欢迎您加入讨论。搜索微信号 "tdengine",加小 T 为好友,即可入群。
|
TDengine 官方社群「物联网大数据群」对外开放,欢迎您加入讨论。搜索微信号 "tdengine1",加小 T 为好友,即可入群。
|
||||||
|
|
|
@ -102,6 +102,7 @@ typedef struct {
|
||||||
STqExecHandle execHandle; // exec
|
STqExecHandle execHandle; // exec
|
||||||
SRpcMsg* msg;
|
SRpcMsg* msg;
|
||||||
int32_t noDataPollCnt;
|
int32_t noDataPollCnt;
|
||||||
|
int8_t exec;
|
||||||
} STqHandle;
|
} STqHandle;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
|
|
@ -55,6 +55,7 @@ int32_t tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg) {
|
||||||
memcpy(pHandle->msg, pMsg, sizeof(SRpcMsg));
|
memcpy(pHandle->msg, pMsg, sizeof(SRpcMsg));
|
||||||
pHandle->msg->pCont = rpcMallocCont(pMsg->contLen);
|
pHandle->msg->pCont = rpcMallocCont(pMsg->contLen);
|
||||||
} else {
|
} else {
|
||||||
|
tqPushDataRsp(pTq, pHandle);
|
||||||
void* tmp = pHandle->msg->pCont;
|
void* tmp = pHandle->msg->pCont;
|
||||||
memcpy(pHandle->msg, pMsg, sizeof(SRpcMsg));
|
memcpy(pHandle->msg, pMsg, sizeof(SRpcMsg));
|
||||||
pHandle->msg->pCont = tmp;
|
pHandle->msg->pCont = tmp;
|
||||||
|
|
|
@ -162,6 +162,10 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static bool isHandleExecuting(STqHandle* pHandle){
|
||||||
|
return 1 == atomic_load_8(&pHandle->exec);
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest,
|
static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest,
|
||||||
SRpcMsg* pMsg, STqOffsetVal* pOffset) {
|
SRpcMsg* pMsg, STqOffsetVal* pOffset) {
|
||||||
uint64_t consumerId = pRequest->consumerId;
|
uint64_t consumerId = pRequest->consumerId;
|
||||||
|
@ -177,6 +181,12 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
while(isHandleExecuting(pHandle)){
|
||||||
|
tqInfo("sub is executing, pHandle:%p", pHandle);
|
||||||
|
taosMsleep(5);
|
||||||
|
}
|
||||||
|
atomic_store_8(&pHandle->exec, 1);
|
||||||
|
|
||||||
qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId);
|
qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId);
|
||||||
code = tqScanData(pTq, pHandle, &dataRsp, pOffset);
|
code = tqScanData(pTq, pHandle, &dataRsp, pOffset);
|
||||||
if(code != 0) {
|
if(code != 0) {
|
||||||
|
@ -193,6 +203,7 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
|
||||||
code = tqRegisterPushHandle(pTq, pHandle, pMsg);
|
code = tqRegisterPushHandle(pTq, pHandle, pMsg);
|
||||||
taosWUnLockLatch(&pTq->lock);
|
taosWUnLockLatch(&pTq->lock);
|
||||||
tDeleteSMqDataRsp(&dataRsp);
|
tDeleteSMqDataRsp(&dataRsp);
|
||||||
|
atomic_store_8(&pHandle->exec, 0);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
else{
|
else{
|
||||||
|
@ -202,7 +213,6 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
|
||||||
|
|
||||||
|
|
||||||
code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_RSP);
|
code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_RSP);
|
||||||
|
|
||||||
// NOTE: this pHandle->consumerId may have been changed already.
|
// NOTE: this pHandle->consumerId may have been changed already.
|
||||||
|
|
||||||
end:
|
end:
|
||||||
|
@ -214,12 +224,14 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
|
||||||
// taosWUnLockLatch(&pTq->lock);
|
// taosWUnLockLatch(&pTq->lock);
|
||||||
tDeleteSMqDataRsp(&dataRsp);
|
tDeleteSMqDataRsp(&dataRsp);
|
||||||
}
|
}
|
||||||
|
atomic_store_8(&pHandle->exec, 0);
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg, STqOffsetVal *offset) {
|
static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg, STqOffsetVal *offset) {
|
||||||
int code = 0;
|
int code = 0;
|
||||||
int32_t vgId = TD_VID(pTq->pVnode);
|
int32_t vgId = TD_VID(pTq->pVnode);
|
||||||
SWalCkHead* pCkHead = NULL;
|
SWalCkHead* pCkHead = NULL;
|
||||||
SMqMetaRsp metaRsp = {0};
|
SMqMetaRsp metaRsp = {0};
|
||||||
|
@ -232,10 +244,16 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
while(isHandleExecuting(pHandle)){
|
||||||
|
tqInfo("sub is executing, pHandle:%p", pHandle);
|
||||||
|
taosMsleep(5);
|
||||||
|
}
|
||||||
|
atomic_store_8(&pHandle->exec, 1);
|
||||||
|
|
||||||
if (offset->type != TMQ_OFFSET__LOG) {
|
if (offset->type != TMQ_OFFSET__LOG) {
|
||||||
if (tqScanTaosx(pTq, pHandle, &taosxRsp, &metaRsp, offset) < 0) {
|
if (tqScanTaosx(pTq, pHandle, &taosxRsp, &metaRsp, offset) < 0) {
|
||||||
tDeleteSTaosxRsp(&taosxRsp);
|
code = -1;
|
||||||
return -1;
|
goto end;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (metaRsp.metaRspLen > 0) {
|
if (metaRsp.metaRspLen > 0) {
|
||||||
|
@ -243,16 +261,14 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
|
||||||
tqDebug("tmq poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send meta offset type:%d,uid:%" PRId64 ",ts:%" PRId64,
|
tqDebug("tmq poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send meta offset type:%d,uid:%" PRId64 ",ts:%" PRId64,
|
||||||
pRequest->consumerId, pHandle->subKey, vgId, metaRsp.rspOffset.type, metaRsp.rspOffset.uid, metaRsp.rspOffset.ts);
|
pRequest->consumerId, pHandle->subKey, vgId, metaRsp.rspOffset.type, metaRsp.rspOffset.uid, metaRsp.rspOffset.ts);
|
||||||
taosMemoryFree(metaRsp.metaRsp);
|
taosMemoryFree(metaRsp.metaRsp);
|
||||||
tDeleteSTaosxRsp(&taosxRsp);
|
goto end;
|
||||||
return code;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
tqDebug("taosx poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send data blockNum:%d, offset type:%d,uid:%" PRId64
|
tqDebug("taosx poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send data blockNum:%d, offset type:%d,uid:%" PRId64
|
||||||
",ts:%" PRId64,pRequest->consumerId, pHandle->subKey, vgId, taosxRsp.blockNum, taosxRsp.rspOffset.type, taosxRsp.rspOffset.uid,taosxRsp.rspOffset.ts);
|
",ts:%" PRId64,pRequest->consumerId, pHandle->subKey, vgId, taosxRsp.blockNum, taosxRsp.rspOffset.type, taosxRsp.rspOffset.uid,taosxRsp.rspOffset.ts);
|
||||||
if (taosxRsp.blockNum > 0) {
|
if (taosxRsp.blockNum > 0) {
|
||||||
code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP);
|
code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP);
|
||||||
tDeleteSTaosxRsp(&taosxRsp);
|
goto end;
|
||||||
return code;
|
|
||||||
}else {
|
}else {
|
||||||
*offset = taosxRsp.rspOffset;
|
*offset = taosxRsp.rspOffset;
|
||||||
}
|
}
|
||||||
|
@ -264,9 +280,9 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
|
||||||
int64_t fetchVer = offset->version + 1;
|
int64_t fetchVer = offset->version + 1;
|
||||||
pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048);
|
pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048);
|
||||||
if (pCkHead == NULL) {
|
if (pCkHead == NULL) {
|
||||||
tDeleteSTaosxRsp(&taosxRsp);
|
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return -1;
|
code = -1;
|
||||||
|
goto end;
|
||||||
}
|
}
|
||||||
walSetReaderCapacity(pHandle->pWalReader, 2048);
|
walSetReaderCapacity(pHandle->pWalReader, 2048);
|
||||||
int totalRows = 0;
|
int totalRows = 0;
|
||||||
|
@ -281,9 +297,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
|
||||||
if (tqFetchLog(pTq, pHandle, &fetchVer, &pCkHead, pRequest->reqId) < 0) {
|
if (tqFetchLog(pTq, pHandle, &fetchVer, &pCkHead, pRequest->reqId) < 0) {
|
||||||
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
|
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
|
||||||
code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP);
|
code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP);
|
||||||
tDeleteSTaosxRsp(&taosxRsp);
|
goto end;
|
||||||
taosMemoryFreeClear(pCkHead);
|
|
||||||
return code;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
SWalCont* pHead = &pCkHead->head;
|
SWalCont* pHead = &pCkHead->head;
|
||||||
|
@ -295,9 +309,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
|
||||||
if(totalRows > 0) {
|
if(totalRows > 0) {
|
||||||
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer - 1);
|
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer - 1);
|
||||||
code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP);
|
code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP);
|
||||||
tDeleteSTaosxRsp(&taosxRsp);
|
goto end;
|
||||||
taosMemoryFreeClear(pCkHead);
|
|
||||||
return code;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
tqDebug("fetch meta msg, ver:%" PRId64 ", type:%s", pHead->version, TMSG_INFO(pHead->msgType));
|
tqDebug("fetch meta msg, ver:%" PRId64 ", type:%s", pHead->version, TMSG_INFO(pHead->msgType));
|
||||||
|
@ -305,17 +317,8 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
|
||||||
metaRsp.resMsgType = pHead->msgType;
|
metaRsp.resMsgType = pHead->msgType;
|
||||||
metaRsp.metaRspLen = pHead->bodyLen;
|
metaRsp.metaRspLen = pHead->bodyLen;
|
||||||
metaRsp.metaRsp = pHead->body;
|
metaRsp.metaRsp = pHead->body;
|
||||||
if (tqSendMetaPollRsp(pTq, pMsg, pRequest, &metaRsp) < 0) {
|
code = tqSendMetaPollRsp(pTq, pMsg, pRequest, &metaRsp);
|
||||||
code = -1;
|
goto end;
|
||||||
taosMemoryFreeClear(pCkHead);
|
|
||||||
tDeleteSTaosxRsp(&taosxRsp);
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
code = 0;
|
|
||||||
taosMemoryFreeClear(pCkHead);
|
|
||||||
tDeleteSTaosxRsp(&taosxRsp);
|
|
||||||
return code;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// process data
|
// process data
|
||||||
|
@ -325,29 +328,28 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
|
||||||
.ver = pHead->version,
|
.ver = pHead->version,
|
||||||
};
|
};
|
||||||
|
|
||||||
if (tqTaosxScanLog(pTq, pHandle, submit, &taosxRsp, &totalRows) < 0) {
|
code = tqTaosxScanLog(pTq, pHandle, submit, &taosxRsp, &totalRows);
|
||||||
tqError("tmq poll: tqTaosxScanLog error %" PRId64 ", in vgId:%d, subkey %s", pRequest->consumerId, vgId,
|
if (code < 0) {
|
||||||
pRequest->subKey);
|
tqError("tmq poll: tqTaosxScanLog error %" PRId64 ", in vgId:%d, subkey %s", pRequest->consumerId, vgId, pRequest->subKey);
|
||||||
taosMemoryFreeClear(pCkHead);
|
goto end;
|
||||||
tDeleteSTaosxRsp(&taosxRsp);
|
|
||||||
return -1;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (totalRows >= 4096 || taosxRsp.createTableNum > 0) {
|
if (totalRows >= 4096 || taosxRsp.createTableNum > 0) {
|
||||||
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
|
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
|
||||||
code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP);
|
code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP);
|
||||||
tDeleteSTaosxRsp(&taosxRsp);
|
goto end;
|
||||||
taosMemoryFreeClear(pCkHead);
|
|
||||||
return code;
|
|
||||||
} else {
|
} else {
|
||||||
fetchVer++;
|
fetchVer++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
end:
|
||||||
|
atomic_store_8(&pHandle->exec, 0);
|
||||||
|
|
||||||
tDeleteSTaosxRsp(&taosxRsp);
|
tDeleteSTaosxRsp(&taosxRsp);
|
||||||
taosMemoryFreeClear(pCkHead);
|
taosMemoryFreeClear(pCkHead);
|
||||||
return 0;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg) {
|
int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg) {
|
||||||
|
|
|
@ -1437,12 +1437,12 @@ _return:
|
||||||
SMetaRes* pRes = taosArrayGet(ctx->pResList, pFetch->resIdx);
|
SMetaRes* pRes = taosArrayGet(ctx->pResList, pFetch->resIdx);
|
||||||
pRes->code = code;
|
pRes->code = code;
|
||||||
pRes->pRes = NULL;
|
pRes->pRes = NULL;
|
||||||
|
ctgTaskError("Get table %d.%s.%s meta failed with error %s", pName->acctId, pName->dbname, pName->tname,
|
||||||
|
tstrerror(code));
|
||||||
if (0 == atomic_sub_fetch_32(&ctx->fetchNum, 1)) {
|
if (0 == atomic_sub_fetch_32(&ctx->fetchNum, 1)) {
|
||||||
TSWAP(pTask->res, ctx->pResList);
|
TSWAP(pTask->res, ctx->pResList);
|
||||||
taskDone = true;
|
taskDone = true;
|
||||||
}
|
}
|
||||||
ctgTaskError("Get table %d.%s.%s meta failed with error %s", pName->acctId, pName->dbname, pName->tname,
|
|
||||||
tstrerror(code));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pTask->res && taskDone) {
|
if (pTask->res && taskDone) {
|
||||||
|
|
|
@ -1484,14 +1484,23 @@ static int32_t setSelectValueColumnInfo(SqlFunctionCtx* pCtx, int32_t numOfOutpu
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SHashObj *pSelectFuncs = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
|
||||||
for (int32_t i = 0; i < numOfOutput; ++i) {
|
for (int32_t i = 0; i < numOfOutput; ++i) {
|
||||||
const char* pName = pCtx[i].pExpr->pExpr->_function.functionName;
|
const char* pName = pCtx[i].pExpr->pExpr->_function.functionName;
|
||||||
if ((strcmp(pName, "_select_value") == 0) || (strcmp(pName, "_group_key") == 0)) {
|
if ((strcmp(pName, "_select_value") == 0) || (strcmp(pName, "_group_key") == 0)) {
|
||||||
pValCtx[num++] = &pCtx[i];
|
pValCtx[num++] = &pCtx[i];
|
||||||
} else if (fmIsSelectFunc(pCtx[i].functionId)) {
|
} else if (fmIsSelectFunc(pCtx[i].functionId)) {
|
||||||
p = &pCtx[i];
|
void* data = taosHashGet(pSelectFuncs, pName, strlen(pName));
|
||||||
|
if (taosHashGetSize(pSelectFuncs) != 0 && data == NULL) {
|
||||||
|
p = NULL;
|
||||||
|
break;
|
||||||
|
} else {
|
||||||
|
taosHashPut(pSelectFuncs, pName, strlen(pName), &num, sizeof(num));
|
||||||
|
p = &pCtx[i];
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
taosHashCleanup(pSelectFuncs);
|
||||||
|
|
||||||
if (p != NULL) {
|
if (p != NULL) {
|
||||||
p->subsidiaries.pCtx = pValCtx;
|
p->subsidiaries.pCtx = pValCtx;
|
||||||
|
|
|
@ -388,6 +388,9 @@ static bool isSetUselessCol(SSetOperator* pSetOp, int32_t index, SExprNode* pPro
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t calcConstSetOpProjections(SCalcConstContext* pCxt, SSetOperator* pSetOp, bool subquery) {
|
static int32_t calcConstSetOpProjections(SCalcConstContext* pCxt, SSetOperator* pSetOp, bool subquery) {
|
||||||
|
if (subquery && pSetOp->opType == SET_OP_TYPE_UNION) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
int32_t index = 0;
|
int32_t index = 0;
|
||||||
SNode* pProj = NULL;
|
SNode* pProj = NULL;
|
||||||
WHERE_EACH(pProj, pSetOp->pProjectionList) {
|
WHERE_EACH(pProj, pSetOp->pProjectionList) {
|
||||||
|
|
|
@ -5326,7 +5326,8 @@ static int32_t checkAlterSuperTableBySchema(STranslateContext* pCxt, SAlterTable
|
||||||
}
|
}
|
||||||
|
|
||||||
if (TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES == pStmt->alterType) {
|
if (TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES == pStmt->alterType) {
|
||||||
if (calcTypeBytes(pStmt->dataType) > TSDB_MAX_FIELD_LEN) {
|
if ((TSDB_DATA_TYPE_VARCHAR == pStmt->dataType.type && calcTypeBytes(pStmt->dataType) > TSDB_MAX_BINARY_LEN) ||
|
||||||
|
(TSDB_DATA_TYPE_NCHAR == pStmt->dataType.type && calcTypeBytes(pStmt->dataType) > TSDB_MAX_NCHAR_LEN)) {
|
||||||
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN);
|
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -5351,6 +5352,11 @@ static int32_t checkAlterSuperTableBySchema(STranslateContext* pCxt, SAlterTable
|
||||||
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_TOO_MANY_COLUMNS);
|
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_TOO_MANY_COLUMNS);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if ((TSDB_DATA_TYPE_VARCHAR == pStmt->dataType.type && calcTypeBytes(pStmt->dataType) > TSDB_MAX_BINARY_LEN) ||
|
||||||
|
(TSDB_DATA_TYPE_NCHAR == pStmt->dataType.type && calcTypeBytes(pStmt->dataType) > TSDB_MAX_NCHAR_LEN)) {
|
||||||
|
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN);
|
||||||
|
}
|
||||||
|
|
||||||
if (pTableMeta->tableInfo.rowSize + calcTypeBytes(pStmt->dataType) > TSDB_MAX_BYTES_PER_ROW) {
|
if (pTableMeta->tableInfo.rowSize + calcTypeBytes(pStmt->dataType) > TSDB_MAX_BYTES_PER_ROW) {
|
||||||
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_ROW_LENGTH, TSDB_MAX_BYTES_PER_ROW);
|
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_ROW_LENGTH, TSDB_MAX_BYTES_PER_ROW);
|
||||||
}
|
}
|
||||||
|
@ -8322,6 +8328,11 @@ static int32_t buildUpdateColReq(STranslateContext* pCxt, SAlterTableStmt* pStmt
|
||||||
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_MODIFY_COL);
|
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_MODIFY_COL);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if ((TSDB_DATA_TYPE_VARCHAR == pStmt->dataType.type && calcTypeBytes(pStmt->dataType) > TSDB_MAX_BINARY_LEN) ||
|
||||||
|
(TSDB_DATA_TYPE_NCHAR == pStmt->dataType.type && calcTypeBytes(pStmt->dataType) > TSDB_MAX_NCHAR_LEN)) {
|
||||||
|
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN);
|
||||||
|
}
|
||||||
|
|
||||||
if (pTableMeta->tableInfo.rowSize + pReq->colModBytes - pSchema->bytes > TSDB_MAX_BYTES_PER_ROW) {
|
if (pTableMeta->tableInfo.rowSize + pReq->colModBytes - pSchema->bytes > TSDB_MAX_BYTES_PER_ROW) {
|
||||||
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_ROW_LENGTH, TSDB_MAX_BYTES_PER_ROW);
|
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_ROW_LENGTH, TSDB_MAX_BYTES_PER_ROW);
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,6 +19,14 @@
|
||||||
#include "scalar.h"
|
#include "scalar.h"
|
||||||
#include "tglobal.h"
|
#include "tglobal.h"
|
||||||
|
|
||||||
|
static void debugPrintNode(SNode* pNode) {
|
||||||
|
char* pStr = NULL;
|
||||||
|
nodesNodeToString(pNode, false, &pStr, NULL);
|
||||||
|
printf("%s\n", pStr);
|
||||||
|
taosMemoryFree(pStr);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
static void dumpQueryPlan(SQueryPlan* pPlan) {
|
static void dumpQueryPlan(SQueryPlan* pPlan) {
|
||||||
if (!tsQueryPlannerTrace) {
|
if (!tsQueryPlannerTrace) {
|
||||||
return;
|
return;
|
||||||
|
|
|
@ -657,17 +657,34 @@ if $data20 != null then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
|
|
||||||
print =============== error
|
print =============== error for normal table
|
||||||
sql create table tb2023(ts timestamp, f int);
|
sql create table tb2023(ts timestamp, f int);
|
||||||
sql_error alter table tb2023 add column v varchar(16375);
|
sql_error alter table tb2023 add column v varchar(16375);
|
||||||
sql_error alter table tb2023 add column v varchar(16385);
|
sql_error alter table tb2023 add column v varchar(16385);
|
||||||
sql_error alter table tb2023 add column v varchar(33100);
|
sql_error alter table tb2023 add column v varchar(33100);
|
||||||
sql alter table tb2023 add column v varchar(16374);
|
sql alter table tb2023 add column v varchar(16374);
|
||||||
|
sql_error alter table tb2023 modify column v varchar(16375);
|
||||||
sql desc tb2023
|
sql desc tb2023
|
||||||
sql alter table tb2023 drop column v
|
sql alter table tb2023 drop column v
|
||||||
sql_error alter table tb2023 add column v nchar(4094);
|
sql_error alter table tb2023 add column v nchar(4094);
|
||||||
sql alter table tb2023 add column v nchar(4093);
|
sql alter table tb2023 add column v nchar(4093);
|
||||||
|
sql_error alter table tb2023 modify column v nchar(4094);
|
||||||
sql desc tb2023
|
sql desc tb2023
|
||||||
|
|
||||||
|
print =============== error for super table
|
||||||
|
sql create table stb2023(ts timestamp, f int) tags(t1 int);
|
||||||
|
sql_error alter table stb2023 add column v varchar(16375);
|
||||||
|
sql_error alter table stb2023 add column v varchar(16385);
|
||||||
|
sql_error alter table stb2023 add column v varchar(33100);
|
||||||
|
sql alter table stb2023 add column v varchar(16374);
|
||||||
|
sql_error alter table stb2023 modify column v varchar(16375);
|
||||||
|
sql desc stb2023
|
||||||
|
sql alter table stb2023 drop column v
|
||||||
|
sql_error alter table stb2023 add column v nchar(4094);
|
||||||
|
sql alter table stb2023 add column v nchar(4093);
|
||||||
|
sql_error alter table stb2023 modify column v nchar(4094);
|
||||||
|
sql desc stb2023
|
||||||
|
|
||||||
print ======= over
|
print ======= over
|
||||||
sql drop database d1
|
sql drop database d1
|
||||||
sql select * from information_schema.ins_databases
|
sql select * from information_schema.ins_databases
|
||||||
|
|
|
@ -48,7 +48,7 @@ sql_error alter table tb modify column c2 binary(10);
|
||||||
sql_error alter table tb modify column c2 binary(9);
|
sql_error alter table tb modify column c2 binary(9);
|
||||||
sql_error alter table tb modify column c2 binary(-9);
|
sql_error alter table tb modify column c2 binary(-9);
|
||||||
sql_error alter table tb modify column c2 binary(0);
|
sql_error alter table tb modify column c2 binary(0);
|
||||||
sql alter table tb modify column c2 binary(17000);
|
sql_error alter table tb modify column c2 binary(17000);
|
||||||
sql_error alter table tb modify column c2 nchar(30);
|
sql_error alter table tb modify column c2 nchar(30);
|
||||||
sql_error alter table tb modify column c3 double;
|
sql_error alter table tb modify column c3 double;
|
||||||
sql_error alter table tb modify column c3 nchar(10);
|
sql_error alter table tb modify column c3 nchar(10);
|
||||||
|
|
|
@ -25,4 +25,21 @@ if $data05 != @0021001@ then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
|
|
||||||
|
sql create table st (ts timestamp, f int) tags (t int);
|
||||||
|
sql insert into ct1 using st tags(1) values(now, 1)(now+1s, 2)
|
||||||
|
sql insert into ct2 using st tags(2) values(now+2s, 3)(now+3s, 4)
|
||||||
|
sql select count(*) from (select * from ct1 union all select * from ct2)
|
||||||
|
if $rows != 1 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
if $data00 != 4 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
sql select count(*) from (select * from ct1 union select * from ct2)
|
||||||
|
if $rows != 1 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
if $data00 != 4 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||||
|
|
|
@ -29,6 +29,7 @@ class TDTestCase:
|
||||||
self.stbname = 'stb'
|
self.stbname = 'stb'
|
||||||
self.binary_length = 20 # the length of binary for column_dict
|
self.binary_length = 20 # the length of binary for column_dict
|
||||||
self.nchar_length = 20 # the length of nchar for column_dict
|
self.nchar_length = 20 # the length of nchar for column_dict
|
||||||
|
self.dbnames = ['db1', 'db2']
|
||||||
self.column_dict = {
|
self.column_dict = {
|
||||||
'ts': 'timestamp',
|
'ts': 'timestamp',
|
||||||
'col1': 'float',
|
'col1': 'float',
|
||||||
|
@ -57,21 +58,25 @@ class TDTestCase:
|
||||||
def create_user(self):
|
def create_user(self):
|
||||||
user_name = 'test'
|
user_name = 'test'
|
||||||
tdSql.execute(f'create user {user_name} pass "test"')
|
tdSql.execute(f'create user {user_name} pass "test"')
|
||||||
tdSql.execute(f'grant read on db.stb with t2 = "Beijing" to {user_name}')
|
tdSql.execute(f'grant read on {self.dbnames[0]}.{self.stbname} with t2 = "Beijing" to {user_name}')
|
||||||
|
tdSql.execute(f'grant write on {self.dbnames[1]}.{self.stbname} with t1 = 2 to {user_name}')
|
||||||
|
|
||||||
def prepare_data(self):
|
def prepare_data(self):
|
||||||
tdSql.execute(self.setsql.set_create_stable_sql(self.stbname, self.column_dict, self.tag_dict))
|
for db in self.dbnames:
|
||||||
for i in range(self.tbnum):
|
tdSql.execute(f"create database {db}")
|
||||||
tdSql.execute(f'create table {self.stbname}_{i} using {self.stbname} tags({self.tag_list[i]})')
|
tdSql.execute(f"use {db}")
|
||||||
for j in self.values_list:
|
tdSql.execute(self.setsql.set_create_stable_sql(self.stbname, self.column_dict, self.tag_dict))
|
||||||
tdSql.execute(f'insert into {self.stbname}_{i} values({j})')
|
for i in range(self.tbnum):
|
||||||
|
tdSql.execute(f'create table {self.stbname}_{i} using {self.stbname} tags({self.tag_list[i]})')
|
||||||
|
for j in self.values_list:
|
||||||
|
tdSql.execute(f'insert into {self.stbname}_{i} values({j})')
|
||||||
|
|
||||||
def user_privilege_check(self):
|
def user_read_privilege_check(self, dbname):
|
||||||
testconn = taos.connect(user='test', password='test')
|
testconn = taos.connect(user='test', password='test')
|
||||||
expectErrNotOccured = False
|
expectErrNotOccured = False
|
||||||
|
|
||||||
try:
|
try:
|
||||||
sql = "select count(*) from db.stb where t2 = 'Beijing'"
|
sql = f"select count(*) from {dbname}.stb where t2 = 'Beijing'"
|
||||||
res = testconn.query(sql)
|
res = testconn.query(sql)
|
||||||
data = res.fetch_all()
|
data = res.fetch_all()
|
||||||
count = data[0][0]
|
count = data[0][0]
|
||||||
|
@ -85,11 +90,30 @@ class TDTestCase:
|
||||||
tdLog.exit(f"{sql}, expect result doesn't match")
|
tdLog.exit(f"{sql}, expect result doesn't match")
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
def user_write_privilege_check(self, dbname):
|
||||||
|
testconn = taos.connect(user='test', password='test')
|
||||||
|
expectErrNotOccured = False
|
||||||
|
|
||||||
|
try:
|
||||||
|
sql = f"insert into {dbname}.stb_1 values(now, 1.1, 200, 0.3)"
|
||||||
|
testconn.execute(sql)
|
||||||
|
except BaseException:
|
||||||
|
expectErrNotOccured = True
|
||||||
|
|
||||||
|
if expectErrNotOccured:
|
||||||
|
caller = inspect.getframeinfo(inspect.stack()[1][0])
|
||||||
|
tdLog.exit(f"{caller.filename}({caller.lineno}) failed: sql:{sql}, expect error not occured")
|
||||||
|
else:
|
||||||
|
pass
|
||||||
|
|
||||||
def user_privilege_error_check(self):
|
def user_privilege_error_check(self):
|
||||||
testconn = taos.connect(user='test', password='test')
|
testconn = taos.connect(user='test', password='test')
|
||||||
expectErrNotOccured = False
|
expectErrNotOccured = False
|
||||||
|
|
||||||
sql_list = ["alter talbe db.stb_1 set t2 = 'Wuhan'", "drop table db.stb_1"]
|
sql_list = [f"alter talbe {self.dbnames[0]}.stb_1 set t2 = 'Wuhan'",
|
||||||
|
f"insert into {self.dbnames[0]}.stb_1 values(now, 1.1, 200, 0.3)",
|
||||||
|
f"drop table {self.dbnames[0]}.stb_1",
|
||||||
|
f"select count(*) from {self.dbnames[1]}.stb"]
|
||||||
|
|
||||||
for sql in sql_list:
|
for sql in sql_list:
|
||||||
try:
|
try:
|
||||||
|
@ -105,10 +129,10 @@ class TDTestCase:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
tdSql.prepare()
|
|
||||||
self.prepare_data()
|
self.prepare_data()
|
||||||
self.create_user()
|
self.create_user()
|
||||||
self.user_privilege_check()
|
self.user_read_privilege_check(self.dbnames[0])
|
||||||
|
self.user_write_privilege_check(self.dbnames[1])
|
||||||
self.user_privilege_error_check()
|
self.user_privilege_error_check()
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
|
|
Loading…
Reference in New Issue