chore: solve conflicts of sql.c

This commit is contained in:
kailixu 2024-04-11 11:05:58 +08:00
commit 3738ce9e10
46 changed files with 4658 additions and 3836 deletions

View File

@ -29,21 +29,7 @@ typedef struct SCorEpSet {
#define GET_ACTIVE_EP(_eps) (&((_eps)->eps[(_eps)->inUse]))
#define EPSET_TO_STR(_eps, tbuf) \
do { \
int len = snprintf((tbuf), sizeof(tbuf), "epset:{"); \
for (int _i = 0; _i < (_eps)->numOfEps; _i++) { \
if (_i == (_eps)->numOfEps - 1) { \
len += \
snprintf((tbuf) + len, sizeof(tbuf) - len, "%d. %s:%d", _i, (_eps)->eps[_i].fqdn, (_eps)->eps[_i].port); \
} else { \
len += \
snprintf((tbuf) + len, sizeof(tbuf) - len, "%d. %s:%d, ", _i, (_eps)->eps[_i].fqdn, (_eps)->eps[_i].port); \
} \
} \
len += snprintf((tbuf) + len, sizeof(tbuf) - len, "}, inUse:%d", (_eps)->inUse); \
} while (0);
int32_t epsetToStr(const SEpSet* pEpSet, char* pBuf, int32_t len);
int32_t taosGetFqdnPortFromEp(const char* ep, SEp* pEp);
void addEpIntoEpSet(SEpSet* pEpSet, const char* fqdn, uint16_t port);

View File

@ -516,7 +516,6 @@ typedef struct SStreamMeta {
TdThreadMutex backendMutex;
SMetaHbInfo* pHbInfo;
STaskUpdateInfo updateInfo;
SHashObj* pUpdateTaskSet;
int32_t numOfStreamTasks; // this value should be increased when a new task is added into the meta
int32_t numOfPausedTasks;
int64_t rid;

View File

@ -22,10 +22,10 @@
extern "C" {
#endif
int64_t taosStrHumanToInt64(const char* str);
int32_t taosStrHumanToInt64(const char* str, int64_t* out);
void taosInt64ToHumanStr(int64_t val, char* outStr);
int32_t taosStrHumanToInt32(const char* str);
int32_t taosStrHumanToInt32(const char* str, int32_t* out);
void taosInt32ToHumanStr(int32_t val, char* outStr);
#ifdef __cplusplus

View File

@ -56,6 +56,8 @@ void taosIpPort2String(uint32_t ip, uint16_t port, char *str);
void *tmemmem(const char *haystack, int hlen, const char *needle, int nlen);
int32_t parseCfgReal(const char* str, double* out);
static FORCE_INLINE void taosEncryptPass(uint8_t *inBuf, size_t inLen, char *target) {
T_MD5_CTX context;
tMD5Init(&context);

View File

@ -496,6 +496,11 @@ static int32_t hbAsyncCallBack(void *param, SDataBuf *pMsg, int32_t code) {
if (code != 0) {
pInst->onlineDnodes = pInst->totalDnodes ? 0 : -1;
tscDebug("hb rsp error %s, update server status %d/%d", tstrerror(code), pInst->onlineDnodes, pInst->totalDnodes);
taosThreadMutexUnlock(&clientHbMgr.lock);
taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet);
tFreeClientHbBatchRsp(&pRsp);
return -1;
}
if (rspNum) {

View File

@ -70,6 +70,7 @@ void epsetAssign(SEpSet* pDst, const SEpSet* pSrc) {
tstrncpy(pDst->eps[i].fqdn, pSrc->eps[i].fqdn, tListLen(pSrc->eps[i].fqdn));
}
}
void epAssign(SEp* pDst, SEp* pSrc) {
if (pSrc == NULL || pDst == NULL) {
return;
@ -78,6 +79,7 @@ void epAssign(SEp* pDst, SEp* pSrc) {
tstrncpy(pDst->fqdn, pSrc->fqdn, tListLen(pSrc->fqdn));
pDst->port = pSrc->port;
}
void epsetSort(SEpSet* pDst) {
if (pDst->numOfEps <= 1) {
return;
@ -127,6 +129,34 @@ SEpSet getEpSet_s(SCorEpSet* pEpSet) {
return ep;
}
int32_t epsetToStr(const SEpSet* pEpSet, char* pBuf, int32_t bufLen) {
int len = snprintf(pBuf, bufLen, "epset:{");
if (len < 0) {
return -1;
}
for (int _i = 0; (_i < pEpSet->numOfEps) && (bufLen > len); _i++) {
int32_t ret = 0;
if (_i == pEpSet->numOfEps - 1) {
ret = snprintf(pBuf + len, bufLen - len, "%d. %s:%d", _i, pEpSet->eps[_i].fqdn, pEpSet->eps[_i].port);
} else {
ret = snprintf(pBuf + len, bufLen - len, "%d. %s:%d, ", _i, pEpSet->eps[_i].fqdn, pEpSet->eps[_i].port);
}
if (ret < 0) {
return -1;
}
len += ret;
}
if (len < bufLen) {
/*len += */snprintf(pBuf + len, bufLen - len, "}, inUse:%d", pEpSet->inUse);
}
return TSDB_CODE_SUCCESS;
}
int32_t taosGenCrashJsonMsg(int signum, char** pMsg, int64_t clusterId, int64_t startTime) {
SJson* pJson = tjsonCreateObject();
if (pJson == NULL) return -1;

View File

@ -837,7 +837,7 @@ _OVER:
"msg:%p, type:%s failed to process since %s, mnode restored:%d stopped:%d, sync restored:%d "
"role:%s, redirect numOfEps:%d inUse:%d, type:%s",
pMsg, TMSG_INFO(pMsg->msgType), terrstr(), pMnode->restored, pMnode->stopped, state.restored,
syncStr(state.restored), epSet.numOfEps, epSet.inUse, TMSG_INFO(pMsg->msgType));
syncStr(state.state), epSet.numOfEps, epSet.inUse, TMSG_INFO(pMsg->msgType));
if (epSet.numOfEps <= 0) return -1;

View File

@ -1747,7 +1747,8 @@ static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pP
const SEp *pPrevEp = GET_ACTIVE_EP(&pPrevEntry->epset);
char buf[256] = {0};
EPSET_TO_STR(&pCurrent->epset, buf);
epsetToStr(&pCurrent->epset, buf, tListLen(buf));
mDebug("nodeId:%d restart/epset changed detected, old:%s:%d -> new:%s, stageUpdate:%d", pCurrent->nodeId,
pPrevEp->fqdn, pPrevEp->port, buf, pPrevEntry->stageUpdated);
@ -1898,7 +1899,7 @@ static SArray *extractNodeListFromStream(SMnode *pMnode) {
taosArrayPush(plist, pEntry);
char buf[256] = {0};
EPSET_TO_STR(&pEntry->epset, buf);
epsetToStr(&pEntry->epset, buf, tListLen(buf));
mDebug("extract nodeInfo from stream obj, nodeId:%d, %s", pEntry->nodeId, buf);
}
taosHashCleanup(pHash);

View File

@ -114,7 +114,7 @@ SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady) {
}
char buf[256] = {0};
EPSET_TO_STR(&entry.epset, buf);
epsetToStr(&entry.epset, buf, tListLen(buf));
mDebug("take node snapshot, nodeId:%d %s", entry.nodeId, buf);
taosArrayPush(pVgroupListSnapshot, &entry);
@ -133,7 +133,7 @@ SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady) {
entry.nodeId = SNODE_HANDLE;
char buf[256] = {0};
EPSET_TO_STR(&entry.epset, buf);
epsetToStr(&entry.epset, buf, tListLen(buf));
mDebug("take snode snapshot, nodeId:%d %s", entry.nodeId, buf);
taosArrayPush(pVgroupListSnapshot, &entry);
sdbRelease(pSdb, pObj);
@ -302,7 +302,7 @@ static int32_t doSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTa
}
char buf[256] = {0};
EPSET_TO_STR(&epset, buf);
epsetToStr(&epset, buf, tListLen(buf));
mDebug("pause stream task in node:%d, epset:%s", pTask->info.nodeId, buf);
code = setTransAction(pTrans, pReq, sizeof(SVPauseStreamTaskReq), TDMT_STREAM_TASK_PAUSE, &epset, 0);

View File

@ -857,6 +857,58 @@ int32_t mndTransCheckConflict(SMnode *pMnode, STrans *pTrans) {
return 0;
}
static bool mndTransActionsOfSameType(SArray *pActions) {
int32_t size = taosArrayGetSize(pActions);
ETrnAct lastActType = TRANS_ACTION_NULL;
bool same = true;
for (int32_t i = 0; i < size; ++i) {
STransAction *pAction = taosArrayGet(pActions, i);
if (i > 0) {
if (lastActType != pAction->actionType) {
same = false;
break;
}
}
lastActType = pAction->actionType;
}
return same;
}
static int32_t mndTransCheckParallelActions(SMnode *pMnode, STrans *pTrans) {
if (pTrans->exec == TRN_EXEC_PARALLEL) {
if (mndTransActionsOfSameType(pTrans->redoActions) == false) {
terrno = TSDB_CODE_MND_TRANS_INVALID_STAGE;
mError("trans:%d, types of parallel redo actions are not the same", pTrans->id);
return -1;
}
if (pTrans->policy == TRN_POLICY_ROLLBACK) {
if (mndTransActionsOfSameType(pTrans->undoActions) == false) {
terrno = TSDB_CODE_MND_TRANS_INVALID_STAGE;
mError("trans:%d, types of parallel undo actions are not the same", pTrans->id);
return -1;
}
}
}
return 0;
}
static int32_t mndTransCheckCommitActions(SMnode *pMnode, STrans *pTrans) {
if (!pTrans->changeless && taosArrayGetSize(pTrans->commitActions) <= 0) {
terrno = TSDB_CODE_MND_TRANS_CLOG_IS_NULL;
mError("trans:%d, commit actions of non-changeless trans are empty", pTrans->id);
return -1;
}
if (mndTransActionsOfSameType(pTrans->commitActions) == false) {
terrno = TSDB_CODE_MND_TRANS_INVALID_STAGE;
mError("trans:%d, types of commit actions are not the same", pTrans->id);
return -1;
}
return 0;
}
int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) {
if (pTrans == NULL) return -1;
@ -864,9 +916,11 @@ int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) {
return -1;
}
if (!pTrans->changeless && taosArrayGetSize(pTrans->commitActions) <= 0) {
terrno = TSDB_CODE_MND_TRANS_CLOG_IS_NULL;
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
if (mndTransCheckParallelActions(pMnode, pTrans) != 0) {
return -1;
}
if (mndTransCheckCommitActions(pMnode, pTrans) != 0) {
return -1;
}
@ -1283,24 +1337,25 @@ static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pA
static int32_t mndTransExecuteRedoActions(SMnode *pMnode, STrans *pTrans, bool topHalf) {
int32_t code = mndTransExecuteActions(pMnode, pTrans, pTrans->redoActions, topHalf);
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
mError("failed to execute redoActions since:%s, code:0x%x", terrstr(), terrno);
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS && code != TSDB_CODE_MND_TRANS_CTX_SWITCH) {
mError("trans:%d, failed to execute redoActions since:%s, code:0x%x, topHalf:%d", pTrans->id, terrstr(), terrno,
topHalf);
}
return code;
}
static int32_t mndTransExecuteUndoActions(SMnode *pMnode, STrans *pTrans, bool topHalf) {
int32_t code = mndTransExecuteActions(pMnode, pTrans, pTrans->undoActions, topHalf);
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
mError("failed to execute undoActions since %s", terrstr());
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS && code != TSDB_CODE_MND_TRANS_CTX_SWITCH) {
mError("trans:%d, failed to execute undoActions since %s. topHalf:%d", pTrans->id, terrstr(), topHalf);
}
return code;
}
static int32_t mndTransExecuteCommitActions(SMnode *pMnode, STrans *pTrans, bool topHalf) {
int32_t code = mndTransExecuteActions(pMnode, pTrans, pTrans->commitActions, topHalf);
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
mError("failed to execute commitActions since %s", terrstr());
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS && code != TSDB_CODE_MND_TRANS_CTX_SWITCH) {
mError("trans:%d, failed to execute commitActions since %s. topHalf:%d", pTrans->id, terrstr(), topHalf);
}
return code;
}

View File

@ -30,11 +30,11 @@
extern "C" {
#endif
typedef struct SSnode {
struct SSnode {
char* path;
SStreamMeta* pMeta;
SMsgCb msgCb;
} SSnode;
};
#if 0
typedef struct {

View File

@ -923,13 +923,14 @@ int32_t handleStep2Async(SStreamTask* pStreamTask, void* param) {
STaskId hId = pStreamTask->hTaskInfo.id;
SStreamTask* pTask = streamMetaAcquireTask(pStreamTask->pMeta, hId.streamId, hId.taskId);
if (pTask == NULL) {
// todo handle error
tqWarn("s-task:0x%x failed to acquired it to exec step 2, scan wal quit", (int32_t) hId.taskId);
return TSDB_CODE_SUCCESS;
}
doStartFillhistoryStep2(pTask, pStreamTask, pTq);
streamMetaReleaseTask(pMeta, pTask);
return 0;
return TSDB_CODE_SUCCESS;
}
// this function should be executed by only one thread, so we set an sentinel to protect this function

View File

@ -311,7 +311,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
SSDataBlock* pBlock = taosArrayGet(pBlocks, i);
tqAddBlockDataToRsp(pBlock, (SMqDataRsp*)pRsp, taosArrayGetSize(pBlock->pDataBlock),
pTq->pVnode->config.tsdbCfg.precision);
totalRows += pBlock->info.rows;
*totalRows += pBlock->info.rows;
blockDataFreeRes(pBlock);
SSchemaWrapper* pSW = taosArrayGetP(pSchemas, i);
taosArrayPush(pRsp->blockSchema, &pSW);

View File

@ -501,6 +501,10 @@ int32_t tqGetStreamExecInfo(SVnode* pVnode, int64_t streamId, int64_t* pDelay, b
}
// extract the required source task for a given stream, identified by streamId
streamMetaRLock(pMeta);
numOfTasks = taosArrayGetSize(pMeta->pTaskList);
for (int32_t i = 0; i < numOfTasks; ++i) {
STaskId* pId = taosArrayGet(pMeta->pTaskList, i);
if (pId->streamId != streamId) {
@ -552,5 +556,7 @@ int32_t tqGetStreamExecInfo(SVnode* pVnode, int64_t streamId, int64_t* pDelay, b
walCloseReader(pReader);
}
streamMetaRUnLock(pMeta);
return TSDB_CODE_SUCCESS;
}

View File

@ -807,6 +807,7 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead
int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta) {
STaskStartInfo* pStartInfo = &pMeta->startInfo;
int32_t vgId = pMeta->vgId;
bool scanWal = false;
streamMetaWLock(pMeta);
if (pStartInfo->taskStarting == 1) {
@ -831,10 +832,18 @@ int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta) {
pStartInfo->restartCount = 0;
tqDebug("vgId:%d all tasks are ready, reset restartCounter 0, not restart tasks", vgId);
}
scanWal = true;
}
}
streamMetaWUnLock(pMeta);
if (scanWal && (vgId != SNODE_HANDLE)) {
tqDebug("vgId:%d start scan wal for executing tasks", vgId);
tqScanWalAsync(pMeta->ahandle, true);
}
return TSDB_CODE_SUCCESS;
}

View File

@ -2136,6 +2136,9 @@ int32_t buildGroupIdMapForAllTables(STableListInfo* pTableListInfo, SReadHandle*
}
pTableListInfo->oneTableForEachGroup = groupByTbname;
if (numOfTables == 1 && pTableListInfo->idInfo.tableType == TSDB_CHILD_TABLE) {
pTableListInfo->oneTableForEachGroup = true;
}
if (groupSort && groupByTbname) {
taosArraySort(pTableListInfo->pTableList, orderbyGroupIdComparFn);

View File

@ -889,14 +889,15 @@ static SSDataBlock* doGroupedTableScan(SOperatorInfo* pOperator) {
if (pTableScanInfo->countState < TABLE_COUNT_STATE_END) {
STableListInfo* pTableListInfo = pTableScanInfo->base.pTableListInfo;
if (pTableListInfo->oneTableForEachGroup || pTableListInfo->groupOffset) { // group by tbname, group by tag + sort
if (pTableListInfo->oneTableForEachGroup || pTableListInfo->groupOffset) { // group by tbname, group by tag + sort
if (pTableScanInfo->countState < TABLE_COUNT_STATE_PROCESSED) {
pTableScanInfo->countState = TABLE_COUNT_STATE_PROCESSED;
STableKeyInfo* pStart =
(STableKeyInfo*)tableListGetInfo(pTableScanInfo->base.pTableListInfo, pTableScanInfo->tableStartIndex);
if (NULL == pStart) return NULL;
return getBlockForEmptyTable(pOperator, pStart);
}
} else { // group by tag + no sort
} else { // group by tag + no sort
int32_t numOfTables = tableListGetSize(pTableListInfo);
if (pTableScanInfo->tableEndIndex + 1 >= numOfTables) {
// get empty group, mark processed & rm from hash

View File

@ -2017,9 +2017,17 @@ static int32_t translateCast(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
if (IS_STR_DATA_TYPE(para2Type)) {
para2Bytes -= VARSTR_HEADER_SIZE;
}
if (para2Bytes <= 0 || para2Bytes > 4096) { // cast dst var type length limits to 4096 bytes
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
"CAST function converted length should be in range (0, 4096] bytes");
if (para2Bytes <= 0 ||
para2Bytes > TSDB_MAX_BINARY_LEN - VARSTR_HEADER_SIZE) { // cast dst var type length limits to 4096 bytes
if (TSDB_DATA_TYPE_NCHAR == para2Type) {
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
"CAST function converted length should be in range (0, %d] NCHARS",
(TSDB_MAX_BINARY_LEN - VARSTR_HEADER_SIZE)/TSDB_NCHAR_SIZE);
} else {
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
"CAST function converted length should be in range (0, %d] bytes",
TSDB_MAX_BINARY_LEN - VARSTR_HEADER_SIZE);
}
}
// add database precision as param

View File

@ -3201,15 +3201,15 @@ static int32_t doSaveTupleData(SSerializeDataHandle* pHandle, const void* pBuf,
int32_t saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos) {
prepareBuf(pCtx);
SWinKey key;
SWinKey key = {0};
if (pCtx->saveHandle.pBuf == NULL) {
SColumnInfoData* pColInfo = taosArrayGet(pSrcBlock->pDataBlock, 0);
if (pColInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP) {
int64_t skey = *(int64_t*)colDataGetData(pColInfo, rowIndex);
key.groupId = pSrcBlock->info.id.groupId;
key.ts = skey;
SColumnInfoData* pColInfo = pCtx->input.pPTS;
if (!pColInfo || pColInfo->info.type != TSDB_DATA_TYPE_TIMESTAMP) {
pColInfo = taosArrayGet(pSrcBlock->pDataBlock, 0);
}
ASSERT(pColInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP);
key.groupId = pSrcBlock->info.id.groupId;
key.ts = *(int64_t*)colDataGetData(pColInfo, rowIndex);;
}
char* buf = serializeTupleData(pSrcBlock, rowIndex, &pCtx->subsidiaries, pCtx->subsidiaries.buf);

View File

@ -420,6 +420,13 @@ type_name(A) ::= DECIMAL.
type_name(A) ::= DECIMAL NK_LP NK_INTEGER NK_RP. { A = createDataType(TSDB_DATA_TYPE_DECIMAL); }
type_name(A) ::= DECIMAL NK_LP NK_INTEGER NK_COMMA NK_INTEGER NK_RP. { A = createDataType(TSDB_DATA_TYPE_DECIMAL); }
%type type_name_default_len { SDataType }
%destructor type_name_default_len { }
type_name_default_len(A) ::= BINARY. { A = createVarLenDataType(TSDB_DATA_TYPE_BINARY, NULL); }
type_name_default_len(A) ::= NCHAR. { A = createVarLenDataType(TSDB_DATA_TYPE_NCHAR, NULL); }
type_name_default_len(A) ::= VARCHAR. { A = createVarLenDataType(TSDB_DATA_TYPE_VARCHAR, NULL); }
type_name_default_len(A) ::= VARBINARY. { A = createVarLenDataType(TSDB_DATA_TYPE_VARBINARY, NULL); }
%type tags_def_opt { SNodeList* }
%destructor tags_def_opt { nodesDestroyList($$); }
tags_def_opt(A) ::= . { A = NULL; }
@ -1118,6 +1125,9 @@ function_expression(A) ::= function_name(B) NK_LP expression_list(C) NK_RP(D).
function_expression(A) ::= star_func(B) NK_LP star_func_para_list(C) NK_RP(D). { A = createRawExprNodeExt(pCxt, &B, &D, createFunctionNode(pCxt, &B, C)); }
function_expression(A) ::=
CAST(B) NK_LP expr_or_subquery(C) AS type_name(D) NK_RP(E). { A = createRawExprNodeExt(pCxt, &B, &E, createCastFunctionNode(pCxt, releaseRawExprNode(pCxt, C), D)); }
function_expression(A) ::=
CAST(B) NK_LP expr_or_subquery(C) AS type_name_default_len(D) NK_RP(E). { A = createRawExprNodeExt(pCxt, &B, &E, createCastFunctionNode(pCxt, releaseRawExprNode(pCxt, C), D)); }
function_expression(A) ::= literal_func(B). { A = B; }
literal_func(A) ::= noarg_func(B) NK_LP NK_RP(C). { A = createRawExprNodeExt(pCxt, &B, &C, createFunctionNode(pCxt, &B, NULL)); }

View File

@ -1607,7 +1607,10 @@ SDataType createDataType(uint8_t type) {
}
SDataType createVarLenDataType(uint8_t type, const SToken* pLen) {
SDataType dt = {.type = type, .precision = 0, .scale = 0, .bytes = taosStr2Int32(pLen->z, NULL, 10)};
int32_t len = TSDB_MAX_BINARY_LEN - VARSTR_HEADER_SIZE;
if (type == TSDB_DATA_TYPE_NCHAR) len /= TSDB_NCHAR_SIZE;
if(pLen) len = taosStr2Int32(pLen->z, NULL, 10);
SDataType dt = {.type = type, .precision = 0, .scale = 0, .bytes = len};
return dt;
}

View File

@ -1572,11 +1572,13 @@ static int32_t parseValueTokenImpl(SInsertParseContext* pCxt, const char** pSql,
case TSDB_DATA_TYPE_NCHAR: {
// if the converted output len is over than pColumnModel->bytes, return error: 'Argument list too long'
int32_t len = 0;
char* pUcs4 = taosMemoryCalloc(1, pSchema->bytes - VARSTR_HEADER_SIZE);
int64_t realLen = pToken->n << 2;
if (realLen > pSchema->bytes - VARSTR_HEADER_SIZE) realLen = pSchema->bytes - VARSTR_HEADER_SIZE;
char* pUcs4 = taosMemoryMalloc(realLen);
if (NULL == pUcs4) {
return TSDB_CODE_OUT_OF_MEMORY;
}
if (!taosMbsToUcs4(pToken->z, pToken->n, (TdUcs4*)pUcs4, pSchema->bytes - VARSTR_HEADER_SIZE, &len)) {
if (!taosMbsToUcs4(pToken->z, pToken->n, (TdUcs4*)pUcs4, realLen, &len)) {
taosMemoryFree(pUcs4);
if (errno == E2BIG) {
return generateSyntaxErrMsg(&pCxt->msg, TSDB_CODE_PAR_VALUE_TOO_LONG, pSchema->name);

View File

@ -2111,7 +2111,6 @@ static int32_t translateMultiResFunc(STranslateContext* pCxt, SFunctionNode* pFu
}
if (tsKeepColumnName && 1 == LIST_LENGTH(pFunc->pParameterList) && !pFunc->node.asAlias && !pFunc->node.asParam) {
strcpy(pFunc->node.userAlias, ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->userAlias);
strcpy(pFunc->node.aliasName, pFunc->node.userAlias);
}
return TSDB_CODE_SUCCESS;
}
@ -2703,6 +2702,29 @@ static EDealRes rewriteExprToGroupKeyFunc(STranslateContext* pCxt, SNode** pNode
return (TSDB_CODE_SUCCESS == pCxt->errCode ? DEAL_RES_IGNORE_CHILD : DEAL_RES_ERROR);
}
static bool isTbnameFuction(SNode* pNode) {
return QUERY_NODE_FUNCTION == nodeType(pNode) && FUNCTION_TYPE_TBNAME == ((SFunctionNode*)pNode)->funcType;
}
static bool hasTbnameFunction(SNodeList* pPartitionByList) {
SNode* pPartKey = NULL;
FOREACH(pPartKey, pPartitionByList) {
if (isTbnameFuction(pPartKey)) {
return true;
}
}
return false;
}
static bool fromSubtable(SNode* table) {
if (NULL == table) return false;
if (table->type == QUERY_NODE_REAL_TABLE && ((SRealTableNode*)table)->pMeta &&
((SRealTableNode*)table)->pMeta->tableType == TSDB_CHILD_TABLE) {
return true;
}
return false;
}
static EDealRes doCheckExprForGroupBy(SNode** pNode, void* pContext) {
STranslateContext* pCxt = (STranslateContext*)pContext;
SSelectStmt* pSelect = (SSelectStmt*)pCxt->pCurrStmt;
@ -2714,15 +2736,25 @@ static EDealRes doCheckExprForGroupBy(SNode** pNode, void* pContext) {
}
SNode* pGroupNode = NULL;
FOREACH(pGroupNode, getGroupByList(pCxt)) {
if (nodesEqualNode(getGroupByNode(pGroupNode), *pNode)) {
SNode* pActualNode = getGroupByNode(pGroupNode);
if (nodesEqualNode(pActualNode, *pNode)) {
return DEAL_RES_IGNORE_CHILD;
}
if (isTbnameFuction(pActualNode) && QUERY_NODE_COLUMN == nodeType(*pNode) &&
((SColumnNode*)*pNode)->colType == COLUMN_TYPE_TAG) {
return rewriteExprToGroupKeyFunc(pCxt, pNode);
}
}
SNode* pPartKey = NULL;
bool partionByTbname = hasTbnameFunction(pSelect->pPartitionByList);
FOREACH(pPartKey, pSelect->pPartitionByList) {
if (nodesEqualNode(pPartKey, *pNode)) {
return rewriteExprToGroupKeyFunc(pCxt, pNode);
}
if ((partionByTbname) && QUERY_NODE_COLUMN == nodeType(*pNode) &&
((SColumnNode*)*pNode)->colType == COLUMN_TYPE_TAG) {
return rewriteExprToGroupKeyFunc(pCxt, pNode);
}
}
if (NULL != pSelect->pWindow && QUERY_NODE_STATE_WINDOW == nodeType(pSelect->pWindow)) {
if (nodesEqualNode(((SStateWindowNode*)pSelect->pWindow)->pExpr, *pNode)) {
@ -2786,11 +2818,19 @@ static EDealRes doCheckAggColCoexist(SNode** pNode, void* pContext) {
return DEAL_RES_IGNORE_CHILD;
}
SNode* pPartKey = NULL;
bool partionByTbname = false;
if (fromSubtable(((SSelectStmt*)pCxt->pTranslateCxt->pCurrStmt)->pFromTable) ||
hasTbnameFunction(((SSelectStmt*)pCxt->pTranslateCxt->pCurrStmt)->pPartitionByList)) {
partionByTbname = true;
}
FOREACH(pPartKey, ((SSelectStmt*)pCxt->pTranslateCxt->pCurrStmt)->pPartitionByList) {
if (nodesEqualNode(pPartKey, *pNode)) {
return rewriteExprToGroupKeyFunc(pCxt->pTranslateCxt, pNode);
}
}
if (partionByTbname && QUERY_NODE_COLUMN == nodeType(*pNode) && ((SColumnNode*)*pNode)->colType == COLUMN_TYPE_TAG) {
return rewriteExprToGroupKeyFunc(pCxt->pTranslateCxt, pNode);
}
if (isScanPseudoColumnFunc(*pNode) || QUERY_NODE_COLUMN == nodeType(*pNode)) {
pCxt->existCol = true;
}
@ -3961,22 +4001,12 @@ static int32_t checkStateExpr(STranslateContext* pCxt, SNode* pNode) {
return TSDB_CODE_SUCCESS;
}
static bool hasPartitionByTbname(SNodeList* pPartitionByList) {
SNode* pPartKey = NULL;
FOREACH(pPartKey, pPartitionByList) {
if (QUERY_NODE_FUNCTION == nodeType(pPartKey) && FUNCTION_TYPE_TBNAME == ((SFunctionNode*)pPartKey)->funcType) {
return true;
}
}
return false;
}
static int32_t checkStateWindowForStream(STranslateContext* pCxt, SSelectStmt* pSelect) {
if (!pCxt->createStream) {
return TSDB_CODE_SUCCESS;
}
if (TSDB_SUPER_TABLE == ((SRealTableNode*)pSelect->pFromTable)->pMeta->tableType &&
!hasPartitionByTbname(pSelect->pPartitionByList)) {
!hasTbnameFunction(pSelect->pPartitionByList)) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Unsupported stream query");
}
return TSDB_CODE_SUCCESS;
@ -7612,12 +7642,12 @@ static int32_t translateKillTransaction(STranslateContext* pCxt, SKillStmt* pStm
static bool crossTableWithoutAggOper(SSelectStmt* pSelect) {
return NULL == pSelect->pWindow && !pSelect->hasAggFuncs && !pSelect->hasIndefiniteRowsFunc &&
!pSelect->hasInterpFunc && TSDB_SUPER_TABLE == ((SRealTableNode*)pSelect->pFromTable)->pMeta->tableType &&
!hasPartitionByTbname(pSelect->pPartitionByList);
!hasTbnameFunction(pSelect->pPartitionByList);
}
static bool crossTableWithUdaf(SSelectStmt* pSelect) {
return pSelect->hasUdaf && TSDB_SUPER_TABLE == ((SRealTableNode*)pSelect->pFromTable)->pMeta->tableType &&
!hasPartitionByTbname(pSelect->pPartitionByList);
!hasTbnameFunction(pSelect->pPartitionByList);
}
static int32_t checkCreateStream(STranslateContext* pCxt, SCreateStreamStmt* pStmt) {
@ -7875,7 +7905,7 @@ static int32_t checkStreamQuery(STranslateContext* pCxt, SCreateStreamStmt* pStm
SSelectStmt* pSelect = (SSelectStmt*)pStmt->pQuery;
if ( (SRealTableNode*)pSelect->pFromTable && ((SRealTableNode*)pSelect->pFromTable)->pMeta
&& TSDB_SUPER_TABLE == ((SRealTableNode*)pSelect->pFromTable)->pMeta->tableType
&& !hasPartitionByTbname(pSelect->pPartitionByList)
&& !hasTbnameFunction(pSelect->pPartitionByList)
&& pSelect->pWindow != NULL && pSelect->pWindow->type == QUERY_NODE_EVENT_WINDOW) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY,
"Event window for stream on super table must patitioned by table name");
@ -7903,7 +7933,7 @@ static int32_t checkStreamQuery(STranslateContext* pCxt, SCreateStreamStmt* pStm
if (pSelect->pWindow != NULL && pSelect->pWindow->type == QUERY_NODE_COUNT_WINDOW) {
if ( (SRealTableNode*)pSelect->pFromTable && ((SRealTableNode*)pSelect->pFromTable)->pMeta
&& TSDB_SUPER_TABLE == ((SRealTableNode*)pSelect->pFromTable)->pMeta->tableType
&& !hasPartitionByTbname(pSelect->pPartitionByList) ) {
&& !hasTbnameFunction(pSelect->pPartitionByList) ) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY,
"Count window for stream on super table must patitioned by table name");
}

File diff suppressed because it is too large Load Diff

View File

@ -494,6 +494,9 @@ static int32_t createScanLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect
} else if (pSelect->pPartitionByList) {
isCountByTag = !keysHasCol(pSelect->pPartitionByList);
}
if (pScan->tableType == TSDB_CHILD_TABLE) {
isCountByTag = true;
}
}
pScan->isCountByTag = isCountByTag;

View File

@ -304,7 +304,6 @@ void streamMetaRemoveDB(void* arg, char* key) {
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId, int64_t stage,
startComplete_fn_t fn) {
int32_t code = -1;
SStreamMeta* pMeta = taosMemoryCalloc(1, sizeof(SStreamMeta));
if (pMeta == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
@ -516,7 +515,6 @@ void streamMetaCloseImpl(void* arg) {
taosHashCleanup(pMeta->pTasksMap);
taosHashCleanup(pMeta->pTaskDbUnique);
taosHashCleanup(pMeta->pUpdateTaskSet);
taosHashCleanup(pMeta->updateInfo.pTasks);
taosHashCleanup(pMeta->startInfo.pReadyTaskSet);
taosHashCleanup(pMeta->startInfo.pFailedTaskSet);

View File

@ -119,7 +119,11 @@ int32_t streamReExecScanHistoryFuture(SStreamTask* pTask, int32_t idleDuration)
// add ref for task
SStreamTask* p = streamMetaAcquireTask(pTask->pMeta, pTask->id.streamId, pTask->id.taskId);
ASSERT(p != NULL);
if (p == NULL) {
stError("s-task:0x%x failed to acquire task, status:%s, not exec scan-history data", pTask->id.taskId,
streamTaskGetStatus(pTask)->name);
return TSDB_CODE_SUCCESS;
}
pTask->schedHistoryInfo.numOfTicks = numOfTicks;
@ -394,8 +398,7 @@ void doProcessDownstreamReadyRsp(SStreamTask* pTask) {
if (pTask->status.taskStatus == TASK_STATUS__HALT) {
ASSERT(HAS_RELATED_FILLHISTORY_TASK(pTask) && (pTask->info.fillHistory == 0));
// halt it self for count window stream task until the related
// fill history task completd.
// halt it self for count window stream task until the related fill history task completed.
stDebug("s-task:%s level:%d initial status is %s from mnode, set it to be halt", pTask->id.idStr,
pTask->info.taskLevel, streamTaskGetStatusStr(pTask->status.taskStatus));
streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_HALT);

View File

@ -35,7 +35,7 @@ static int32_t doUpdateTaskEpset(SStreamTask* pTask, int32_t nodeId, SEpSet* pEp
if (pTask->info.nodeId == nodeId) { // execution task should be moved away
epsetAssign(&pTask->info.epSet, pEpSet);
EPSET_TO_STR(pEpSet, buf)
epsetToStr(pEpSet, buf, tListLen(buf));
stDebug("s-task:0x%x (vgId:%d) self node epset is updated %s", pTask->id.taskId, nodeId, buf);
}
@ -380,12 +380,12 @@ void tFreeStreamTask(SStreamTask* pTask) {
}
if (pTask->hTaskInfo.pTimer != NULL) {
taosTmrStop(pTask->hTaskInfo.pTimer);
/*bool ret = */taosTmrStop(pTask->hTaskInfo.pTimer);
pTask->hTaskInfo.pTimer = NULL;
}
if (pTask->msgInfo.pTimer != NULL) {
taosTmrStop(pTask->msgInfo.pTimer);
/*bool ret = */taosTmrStop(pTask->msgInfo.pTimer);
pTask->msgInfo.pTimer = NULL;
}
@ -592,7 +592,7 @@ int32_t streamTaskSetUpstreamInfo(SStreamTask* pTask, const SStreamTask* pUpstre
void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet) {
char buf[512] = {0};
EPSET_TO_STR(pEpSet, buf);
epsetToStr(pEpSet, buf, tListLen(buf));
int32_t numOfUpstream = taosArrayGetSize(pTask->upstreamInfo.pList);
for (int32_t i = 0; i < numOfUpstream; ++i) {
@ -626,7 +626,7 @@ void streamTaskSetFixedDownstreamInfo(SStreamTask* pTask, const SStreamTask* pDo
void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet) {
char buf[512] = {0};
EPSET_TO_STR(pEpSet, buf);
epsetToStr(pEpSet, buf, tListLen(buf));
int32_t id = pTask->id.taskId;
int8_t type = pTask->outputInfo.type;
@ -733,15 +733,12 @@ bool streamTaskIsAllUpstreamClosed(SStreamTask* pTask) {
bool streamTaskSetSchedStatusWait(SStreamTask* pTask) {
bool ret = false;
// double check
taosThreadMutexLock(&pTask->lock);
if (pTask->status.schedStatus == TASK_SCHED_STATUS__INACTIVE) {
taosThreadMutexLock(&pTask->lock);
if (pTask->status.schedStatus == TASK_SCHED_STATUS__INACTIVE) {
pTask->status.schedStatus = TASK_SCHED_STATUS__WAITING;
ret = true;
}
taosThreadMutexUnlock(&pTask->lock);
pTask->status.schedStatus = TASK_SCHED_STATUS__WAITING;
ret = true;
}
taosThreadMutexUnlock(&pTask->lock);
return ret;
}

View File

@ -276,16 +276,18 @@ int32_t syncForceBecomeFollower(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
int32_t syncBecomeAssignedLeader(SSyncNode* ths, SRpcMsg* pRpcMsg) {
int32_t ret = -1;
int32_t errcode = TSDB_CODE_MND_ARB_TOKEN_MISMATCH;
void* pHead = NULL;
int32_t contLen = 0;
SVArbSetAssignedLeaderReq req = {0};
if (tDeserializeSVArbSetAssignedLeaderReq((char*)pRpcMsg->pCont + sizeof(SMsgHead), pRpcMsg->contLen, &req) != 0) {
sError("vgId:%d, failed to deserialize SVArbSetAssignedLeaderReq", ths->vgId);
terrno = TSDB_CODE_INVALID_MSG;
return -1;
errcode = terrno;
goto _OVER;
}
int32_t errcode = TSDB_CODE_MND_ARB_TOKEN_MISMATCH;
if (ths->arbTerm > req.arbTerm) {
sInfo("vgId:%d, skip to set assigned leader, msg with lower term, local:%" PRId64 "msg:%" PRId64, ths->vgId,
ths->arbTerm, req.arbTerm);
@ -294,50 +296,58 @@ int32_t syncBecomeAssignedLeader(SSyncNode* ths, SRpcMsg* pRpcMsg) {
ths->arbTerm = TMAX(req.arbTerm, ths->arbTerm);
if (strncmp(req.memberToken, ths->arbToken, TSDB_ARB_TOKEN_SIZE) == 0) {
if (ths->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) {
raftStoreNextTerm(ths);
if (terrno != TSDB_CODE_SUCCESS) {
sError("vgId:%d, failed to set next term since:%s", ths->vgId, terrstr());
goto _OVER;
}
syncNodeBecomeAssignedLeader(ths);
if (syncNodeAppendNoop(ths) < 0) {
sError("vgId:%d, assigned leader failed to append noop entry since %s", ths->vgId, terrstr());
}
}
errcode = TSDB_CODE_SUCCESS;
} else {
if (strncmp(req.memberToken, ths->arbToken, TSDB_ARB_TOKEN_SIZE) != 0) {
sInfo("vgId:%d, skip to set assigned leader, token mismatch, local:%s, msg:%s", ths->vgId, ths->arbToken,
req.memberToken);
goto _OVER;
}
if (ths->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) {
terrno = TSDB_CODE_SUCCESS;
raftStoreNextTerm(ths);
if (terrno != TSDB_CODE_SUCCESS) {
sError("vgId:%d, failed to set next term since:%s", ths->vgId, terrstr());
errcode = terrno;
goto _OVER;
}
syncNodeBecomeAssignedLeader(ths);
if (syncNodeAppendNoop(ths) < 0) {
sError("vgId:%d, assigned leader failed to append noop entry since %s", ths->vgId, terrstr());
}
}
SVArbSetAssignedLeaderRsp rsp = {0};
rsp.arbToken = req.arbToken;
rsp.memberToken = req.memberToken;
rsp.vgId = ths->vgId;
int32_t contLen = tSerializeSVArbSetAssignedLeaderRsp(NULL, 0, &rsp);
contLen = tSerializeSVArbSetAssignedLeaderRsp(NULL, 0, &rsp);
if (contLen <= 0) {
sError("vgId:%d, failed to serialize SVArbSetAssignedLeaderRsp", ths->vgId);
terrno = TSDB_CODE_OUT_OF_MEMORY;
errcode = terrno;
goto _OVER;
}
void* pHead = rpcMallocCont(contLen);
pHead = rpcMallocCont(contLen);
if (!pHead) {
sError("vgId:%d, failed to malloc memory for SVArbSetAssignedLeaderRsp", ths->vgId);
terrno = TSDB_CODE_OUT_OF_MEMORY;
errcode = terrno;
goto _OVER;
}
if (tSerializeSVArbSetAssignedLeaderRsp(pHead, contLen, &rsp) <= 0) {
sError("vgId:%d, failed to serialize SVArbSetAssignedLeaderRsp", ths->vgId);
terrno = TSDB_CODE_OUT_OF_MEMORY;
errcode = terrno;
rpcFreeCont(pHead);
goto _OVER;
}
errcode = TSDB_CODE_SUCCESS;
ret = 0;
_OVER:;
SRpcMsg rspMsg = {
.code = errcode,
.pCont = pHead,
@ -347,9 +357,6 @@ int32_t syncBecomeAssignedLeader(SSyncNode* ths, SRpcMsg* pRpcMsg) {
tmsgSendRsp(&rspMsg);
ret = 0;
_OVER:
tFreeSVArbSetAssignedLeaderReq(&req);
return ret;
}

View File

@ -2188,7 +2188,7 @@ static void cliSchedMsgToDebug(SCliMsg* pMsg, char* label) {
STransConnCtx* pCtx = pMsg->ctx;
STraceId* trace = &pMsg->msg.info.traceId;
char tbuf[512] = {0};
EPSET_TO_STR(&pCtx->epSet, tbuf);
epsetToStr(&pCtx->epSet, tbuf, tListLen(tbuf));
tGDebug("%s retry on next node,use:%s, step: %d,timeout:%" PRId64 "", label, tbuf, pCtx->retryStep,
pCtx->retryNextInterval);
return;
@ -2421,7 +2421,7 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
if (hasEpSet) {
if (rpcDebugFlag & DEBUG_TRACE) {
char tbuf[512] = {0};
EPSET_TO_STR(&pCtx->epSet, tbuf);
epsetToStr(&pCtx->epSet, tbuf, tListLen(tbuf));
tGTrace("%s conn %p extract epset from msg", CONN_GET_INST_LABEL(pConn), pConn);
}
}

View File

@ -70,7 +70,7 @@ int32_t transDecompressMsg(char** msg, int32_t len) {
char* buf = taosMemoryCalloc(1, oriLen + sizeof(STransMsgHead));
STransMsgHead* pNewHead = (STransMsgHead*)buf;
int32_t decompLen = LZ4_decompress_safe(pCont + sizeof(STransCompMsg), (char*)pNewHead->content,
len - sizeof(STransMsgHead) - sizeof(STransCompMsg), oriLen);
len - sizeof(STransMsgHead) - sizeof(STransCompMsg), oriLen);
memcpy((char*)pNewHead, (char*)pHead, sizeof(STransMsgHead));
pNewHead->msgLen = htonl(oriLen + sizeof(STransMsgHead));
@ -158,6 +158,10 @@ int transResetBuffer(SConnBuffer* connBuf) {
p->left = -1;
p->total = 0;
p->len = 0;
if (p->cap > BUFFER_CAP) {
p->cap = BUFFER_CAP;
p->buf = taosMemoryRealloc(p->buf, p->cap);
}
} else {
ASSERTS(0, "invalid read from sock buf");
return -1;

View File

@ -174,7 +174,9 @@ static int32_t cfgSetBool(SConfigItem *pItem, const char *value, ECfgSrcType sty
}
static int32_t cfgSetInt32(SConfigItem *pItem, const char *value, ECfgSrcType stype) {
int32_t ival = taosStrHumanToInt32(value);
int32_t ival;
int32_t code = taosStrHumanToInt32(value, &ival);
if (code != TSDB_CODE_SUCCESS) return code;
if (ival < pItem->imin || ival > pItem->imax) {
uError("cfg:%s, type:%s src:%s value:%d out of range[%" PRId64 ", %" PRId64 "]", pItem->name,
cfgDtypeStr(pItem->dtype), cfgStypeStr(stype), ival, pItem->imin, pItem->imax);
@ -188,7 +190,9 @@ static int32_t cfgSetInt32(SConfigItem *pItem, const char *value, ECfgSrcType st
}
static int32_t cfgSetInt64(SConfigItem *pItem, const char *value, ECfgSrcType stype) {
int64_t ival = taosStrHumanToInt64(value);
int64_t ival;
int32_t code = taosStrHumanToInt64(value, &ival);
if (code != TSDB_CODE_SUCCESS) return code;
if (ival < pItem->imin || ival > pItem->imax) {
uError("cfg:%s, type:%s src:%s value:%" PRId64 " out of range[%" PRId64 ", %" PRId64 "]", pItem->name,
cfgDtypeStr(pItem->dtype), cfgStypeStr(stype), ival, pItem->imin, pItem->imax);
@ -202,15 +206,16 @@ static int32_t cfgSetInt64(SConfigItem *pItem, const char *value, ECfgSrcType st
}
static int32_t cfgSetFloat(SConfigItem *pItem, const char *value, ECfgSrcType stype) {
float fval = (float)atof(value);
if (fval < pItem->fmin || fval > pItem->fmax) {
double dval;
int32_t code = parseCfgReal(value, &dval);
if (dval < pItem->fmin || dval > pItem->fmax) {
uError("cfg:%s, type:%s src:%s value:%f out of range[%f, %f]", pItem->name, cfgDtypeStr(pItem->dtype),
cfgStypeStr(stype), fval, pItem->fmin, pItem->fmax);
cfgStypeStr(stype), dval, pItem->fmin, pItem->fmax);
terrno = TSDB_CODE_OUT_OF_RANGE;
return -1;
}
pItem->fval = fval;
pItem->fval = (float)dval;
pItem->stype = stype;
return 0;
}
@ -408,7 +413,9 @@ int32_t cfgCheckRangeForDynUpdate(SConfig *pCfg, const char *name, const char *p
}
} break;
case CFG_DTYPE_INT32: {
int32_t ival = (int32_t)taosStrHumanToInt32(pVal);
int32_t ival;
int32_t code = (int32_t)taosStrHumanToInt32(pVal, &ival);
if (code != TSDB_CODE_SUCCESS) return code;
if (ival < pItem->imin || ival > pItem->imax) {
uError("cfg:%s, type:%s value:%d out of range[%" PRId64 ", %" PRId64 "]", pItem->name,
cfgDtypeStr(pItem->dtype), ival, pItem->imin, pItem->imax);
@ -417,7 +424,9 @@ int32_t cfgCheckRangeForDynUpdate(SConfig *pCfg, const char *name, const char *p
}
} break;
case CFG_DTYPE_INT64: {
int64_t ival = (int64_t)taosStrHumanToInt64(pVal);
int64_t ival;
int32_t code = taosStrHumanToInt64(pVal, &ival);
if (code != TSDB_CODE_SUCCESS) return code;
if (ival < pItem->imin || ival > pItem->imax) {
uError("cfg:%s, type:%s value:%" PRId64 " out of range[%" PRId64 ", %" PRId64 "]", pItem->name,
cfgDtypeStr(pItem->dtype), ival, pItem->imin, pItem->imax);
@ -427,9 +436,11 @@ int32_t cfgCheckRangeForDynUpdate(SConfig *pCfg, const char *name, const char *p
} break;
case CFG_DTYPE_FLOAT:
case CFG_DTYPE_DOUBLE: {
float fval = (float)atof(pVal);
if (fval < pItem->fmin || fval > pItem->fmax) {
uError("cfg:%s, type:%s value:%f out of range[%f, %f]", pItem->name, cfgDtypeStr(pItem->dtype), fval,
double dval;
int32_t code = parseCfgReal(pVal, &dval);
if (code != TSDB_CODE_SUCCESS) return code;
if (dval < pItem->fmin || dval > pItem->fmax) {
uError("cfg:%s, type:%s value:%f out of range[%f, %f]", pItem->name, cfgDtypeStr(pItem->dtype), dval,
pItem->fmin, pItem->fmax);
terrno = TSDB_CODE_OUT_OF_RANGE;
return -1;

View File

@ -288,7 +288,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_DNODE_IN_DROPPING, "Dnode in dropping sta
// mnode-trans
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_ALREADY_EXIST, "Transaction already exists")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_NOT_EXIST, "Transaction not exists")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_INVALID_STAGE, "Invalid stage to kill")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_INVALID_STAGE, "Invalid transaction stage")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_CONFLICT, "Conflict transaction not completed")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_CLOG_IS_NULL, "Transaction commitlog is null")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_NETWORK_UNAVAILL, "Unable to establish connection While execute transaction and will continue in the background")

View File

@ -242,6 +242,7 @@ void taosCloseLog() {
taosMemoryFreeClear(tsLogObj.logHandle->buffer);
taosThreadMutexDestroy(&tsLogObj.logMutex);
taosMemoryFreeClear(tsLogObj.logHandle);
tsLogObj.logHandle = NULL;
}
}
@ -285,8 +286,11 @@ static void taosKeepOldLog(char *oldName) {
taosRemoveOldFiles(tsLogDir, tsLogKeepDays);
}
}
static void *taosThreadToOpenNewFile(void *param) {
typedef struct {
TdFilePtr pOldFile;
char keepName[LOG_FILE_NAME_LEN + 20];
} OldFileKeeper;
static OldFileKeeper *taosOpenNewFile() {
char keepName[LOG_FILE_NAME_LEN + 20];
sprintf(keepName, "%s.%d", tsLogObj.logName, tsLogObj.flag);
@ -312,13 +316,26 @@ static void *taosThreadToOpenNewFile(void *param) {
tsLogObj.logHandle->pFile = pFile;
tsLogObj.lines = 0;
tsLogObj.openInProgress = 0;
taosSsleep(20);
taosCloseLogByFd(pOldFile);
OldFileKeeper* oldFileKeeper = taosMemoryMalloc(sizeof(OldFileKeeper));
if (oldFileKeeper == NULL) {
uError("create old log keep info faild! mem is not enough.");
return NULL;
}
oldFileKeeper->pOldFile = pOldFile;
memcpy(oldFileKeeper->keepName, keepName, LOG_FILE_NAME_LEN + 20);
uInfo(" new log file:%d is opened", tsLogObj.flag);
uInfo("==================================");
taosKeepOldLog(keepName);
return oldFileKeeper;
}
static void *taosThreadToCloseOldFile(void* param) {
if(!param) return NULL;
OldFileKeeper* oldFileKeeper = (OldFileKeeper*)param;
taosSsleep(20);
taosCloseLogByFd(oldFileKeeper->pOldFile);
taosKeepOldLog(oldFileKeeper->keepName);
taosMemoryFree(oldFileKeeper);
return NULL;
}
@ -334,7 +351,8 @@ static int32_t taosOpenNewLogFile() {
taosThreadAttrInit(&attr);
taosThreadAttrSetDetachState(&attr, PTHREAD_CREATE_DETACHED);
taosThreadCreate(&thread, &attr, taosThreadToOpenNewFile, NULL);
OldFileKeeper* oldFileKeeper = taosOpenNewFile();
taosThreadCreate(&thread, &attr, taosThreadToCloseOldFile, oldFileKeeper);
taosThreadAttrDestroy(&attr);
}
@ -347,10 +365,11 @@ void taosResetLog() {
// force create a new log file
tsLogObj.lines = tsNumOfLogLines + 10;
taosOpenNewLogFile();
uInfo("==================================");
uInfo(" reset log file ");
if (tsLogObj.logHandle) {
taosOpenNewLogFile();
uInfo("==================================");
uInfo(" reset log file ");
}
}
static bool taosCheckFileIsOpen(char *logFileName) {

View File

@ -23,45 +23,74 @@
#define UNIT_ONE_PEBIBYTE (UNIT_ONE_TEBIBYTE * UNIT_SIZE_CONVERT_FACTOR)
#define UNIT_ONE_EXBIBYTE (UNIT_ONE_PEBIBYTE * UNIT_SIZE_CONVERT_FACTOR)
int64_t taosStrHumanToInt64(const char* str) {
size_t sLen = strlen(str);
if (sLen < 2) return atoll(str);
int64_t val = 0;
char* strNoUnit = NULL;
char unit = str[sLen - 1];
if ((unit == 'P') || (unit == 'p')) {
strNoUnit = taosMemoryCalloc(sLen, 1);
memcpy(strNoUnit, str, sLen - 1);
val = atof(strNoUnit) * UNIT_ONE_PEBIBYTE;
} else if ((unit == 'T') || (unit == 't')) {
strNoUnit = taosMemoryCalloc(sLen, 1);
memcpy(strNoUnit, str, sLen - 1);
val = atof(strNoUnit) * UNIT_ONE_TEBIBYTE;
} else if ((unit == 'G') || (unit == 'g')) {
strNoUnit = taosMemoryCalloc(sLen, 1);
memcpy(strNoUnit, str, sLen - 1);
val = atof(strNoUnit) * UNIT_ONE_GIBIBYTE;
} else if ((unit == 'M') || (unit == 'm')) {
strNoUnit = taosMemoryCalloc(sLen, 1);
memcpy(strNoUnit, str, sLen - 1);
val = atof(strNoUnit) * UNIT_ONE_MEBIBYTE;
} else if ((unit == 'K') || (unit == 'k')) {
strNoUnit = taosMemoryCalloc(sLen, 1);
memcpy(strNoUnit, str, sLen - 1);
val = atof(strNoUnit) * UNIT_ONE_KIBIBYTE;
} else {
val = atoll(str);
static int32_t parseCfgIntWithUnit(const char* str, double *res) {
double val, temp = INT64_MAX;
char* endPtr;
errno = 0;
val = taosStr2Int64(str, &endPtr, 0);
if (*endPtr == '.' || errno == ERANGE) {
errno = 0;
val = taosStr2Double(str, &endPtr);
}
if (endPtr == str || errno == ERANGE || isnan(val)) {
terrno = TSDB_CODE_INVALID_CFG_VALUE;
return -1;
}
while (isspace((unsigned char)*endPtr)) endPtr++;
uint64_t factor = 1;
if (*endPtr != '\0') {
switch (*endPtr) {
case 'P':
case 'p': {
temp /= UNIT_ONE_PEBIBYTE;
factor = UNIT_ONE_PEBIBYTE;
} break;
case 'T':
case 't': {
temp /= UNIT_ONE_TEBIBYTE;
factor = UNIT_ONE_TEBIBYTE;
} break;
case 'G':
case 'g': {
temp /= UNIT_ONE_GIBIBYTE;
factor = UNIT_ONE_GIBIBYTE;
} break;
case 'M':
case 'm': {
temp /= UNIT_ONE_MEBIBYTE;
factor = UNIT_ONE_MEBIBYTE;
} break;
case 'K':
case 'k': {
temp /= UNIT_ONE_KIBIBYTE;
factor = UNIT_ONE_KIBIBYTE;
} break;
default:
terrno = TSDB_CODE_INVALID_CFG_VALUE;
return -1;
}
if ((val > 0 && val > temp) || (val < 0 && val < -temp)) {
terrno = TSDB_CODE_OUT_OF_RANGE;
return -1;
}
endPtr++;
val *= factor;
}
while (isspace((unsigned char)*endPtr)) endPtr++;
if (*endPtr) {
terrno = TSDB_CODE_INVALID_CFG_VALUE;
return -1;
}
val = rint(val);
*res = val;
return TSDB_CODE_SUCCESS;
}
taosMemoryFree(strNoUnit);
return val;
int32_t taosStrHumanToInt64(const char* str, int64_t *out) {
double res;
int32_t code = parseCfgIntWithUnit(str, &res);
if (code == TSDB_CODE_SUCCESS) *out = (int64_t)res;
return code;
}
#ifdef BUILD_NO_CALL
@ -83,35 +112,17 @@ void taosInt64ToHumanStr(int64_t val, char* outStr) {
}
#endif
int32_t taosStrHumanToInt32(const char* str) {
size_t sLen = strlen(str);
if (sLen < 2) return atoll(str);
int32_t val = 0;
char* strNoUnit = NULL;
char unit = str[sLen - 1];
if ((unit == 'G') || (unit == 'g')) {
strNoUnit = taosMemoryCalloc(sLen, 1);
memcpy(strNoUnit, str, sLen - 1);
val = atof(strNoUnit) * UNIT_ONE_GIBIBYTE;
} else if ((unit == 'M') || (unit == 'm')) {
strNoUnit = taosMemoryCalloc(sLen, 1);
memcpy(strNoUnit, str, sLen - 1);
val = atof(strNoUnit) * UNIT_ONE_MEBIBYTE;
} else if ((unit == 'K') || (unit == 'k')) {
strNoUnit = taosMemoryCalloc(sLen, 1);
memcpy(strNoUnit, str, sLen - 1);
val = atof(strNoUnit) * UNIT_ONE_KIBIBYTE;
} else {
val = atoll(str);
int32_t taosStrHumanToInt32(const char* str, int32_t* out) {
double res;
int32_t code = parseCfgIntWithUnit(str, &res);
if (code == TSDB_CODE_SUCCESS) {
if (res < INT32_MIN || res > INT32_MAX) {
terrno = TSDB_CODE_OUT_OF_RANGE;
return -1;
}
*out = (int32_t)res;
}
taosMemoryFree(strNoUnit);
return val;
return code;
}
#ifdef BUILD_NO_CALL

View File

@ -496,3 +496,21 @@ size_t twcsncspn(const TdUcs4 *wcs, size_t size, const TdUcs4 *reject, size_t rs
return index;
}
int32_t parseCfgReal(const char* str, double* out) {
double val;
char *endPtr;
errno = 0;
val = taosStr2Double(str, &endPtr);
if (str == endPtr || errno == ERANGE || isnan(val)) {
terrno = TSDB_CODE_INVALID_CFG_VALUE;
return -1;
}
while(isspace((unsigned char)*endPtr)) endPtr++;
if (*endPtr != '\0') {
terrno = TSDB_CODE_INVALID_CFG_VALUE;
return -1;
}
*out = val;
return TSDB_CODE_SUCCESS;
}

View File

@ -66,10 +66,10 @@ class TDTestCase:
tdSql.query('select * from (select tbname, avg(f) from st partition by tbname) a partition by a.tbname order by a.tbname');
tdSql.checkRows(2)
tdSql.checkCols(2)
tdSql.checkData(0, 0, 'ct1');
tdSql.checkData(0, 1, 6.0);
tdSql.checkData(1, 0, 'ct2');
tdSql.checkData(1, 1, 12.0);
tdSql.checkData(0, 0, 'ct1')
tdSql.checkData(0, 1, 6.0)
tdSql.checkData(1, 0, 'ct2')
tdSql.checkData(1, 1, 12.0)
tdSql.error('select tbname from (select * from st)')
tdSql.error('select st.tbname from (select st.tbname from st)')

View File

@ -870,7 +870,7 @@ sql_error select stddev(c2), tbname from select_tags_mt0;
sql_error select twa(c2), tbname from select_tags_mt0;
sql_error select interp(c2), tbname from select_tags_mt0 where ts=100001;
sql_error select t1,t2,tbname from select_tags_mt0 group by tbname;
sql select count(tbname) from select_tags_mt0 interval(1d);
sql select count(tbname) from select_tags_mt0 group by t1;
sql select count(tbname),SUM(T1) from select_tags_mt0 interval(1d);
@ -888,16 +888,16 @@ sql_error select tbname, t1 from select_tags_mt0 interval(1y);
print ==================================>TD-4231
sql select t1,tbname from select_tags_mt0 where c1<0
sql select t1,tbname from select_tags_mt0 where c1<0 and tbname in ('select_tags_tb12')
sql select tbname from select_tags_mt0 where tbname in ('select_tags_tb12');
sql_error select first(c1), last(c2), t1 from select_tags_mt0 group by tbname;
sql_error select first(c1), last(c2), tbname, t2 from select_tags_mt0 group by tbname;
sql_error select first(c1), count(*), t2, t1, tbname from select_tags_mt0 group by tbname;
#valid sql: select first(c1), t2 from select_tags_mt0 group by tbname;
sql select first(ts), tbname from select_tags_mt0 group by tbname;
sql select count(c1) from select_tags_mt0 where c1=99 group by tbname;
sql select count(*),tbname from select_tags_mt0 group by tbname
#sql select first(ts), tbname from select_tags_mt0 group by tbname;
#sql select count(c1) from select_tags_mt0 where c1=99 group by tbname;
#sql select count(*),tbname from select_tags_mt0 group by tbname
print ==================================> tag supported in group
sql select t1,t2,tbname from select_tags_mt0 group by tbname;
sql select first(c1), last(c2), t1 from select_tags_mt0 group by tbname;
sql select first(c1), last(c2), tbname, t2 from select_tags_mt0 group by tbname;
sql select first(c1), count(*), t2, t1, tbname from select_tags_mt0 group by tbname;
system sh/exec.sh -n dnode1 -s stop -x SIGINT

View File

@ -3,15 +3,24 @@ system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start
sql connect
sql create database test;
sql create database test KEEP 36500;
sql use test;
sql create table st(ts timestamp, f int) tags(t int);
sql insert into ct1 using st tags(1) values(now, 0)(now+1s, 1)(now+2s, 10)(now+3s, 11)
sql insert into ct2 using st tags(2) values(now+2s, 2)(now+3s, 3)
sql insert into ct3 using st tags(3) values(now+4s, 4)(now+5s, 5)
sql insert into ct4 using st tags(4) values(now+6s, 6)(now+7s, 7)
sql select count(*), spread(ts) from st where tbname='ct1'
$ms = 1712135244502
$ms1 = $ms + 1000
$ms2 = $ms + 2000
$ms3 = $ms + 3000
$ms4 = $ms + 4000
$ms5 = $ms + 5000
$ms6 = $ms + 6000
$ms7 = $ms + 7000
sql insert into ct1 using st tags(1) values($ms , 0)($ms1 , 1)($ms2 , 10)($ms3 , 11)
sql insert into ct2 using st tags(2) values($ms2 , 2)($ms3 , 3)
sql insert into ct3 using st tags(3) values($ms4 , 4)($ms5 , 5)
sql insert into ct4 using st tags(4) values($ms6 , 6)($ms7 , 7)
sql select count(*), spread(ts) from st where tbname='ct1'
print $data00, $data01
if $data00 != @4@ then
return -1

View File

@ -15,6 +15,8 @@ sql use test3;
sql create table t1(ts timestamp, a int, b int , c int, d double);
sql create stream streams3 trigger at_once ignore expired 0 ignore update 0 into streamt3 as select _wstart, count(*) c1 from t1 state_window(a);
sleep 1000
sql insert into t1 values(1648791211000,1,2,3,1.0);
sql insert into t1 values(1648791213000,2,2,3,1.1);
sql insert into t1 values(1648791215000,3,2,3,1.1);
@ -214,4 +216,232 @@ if $data[29][1] != 2 then
goto loop11
endi
print step2=============
sql create database test4 vgroups 4;
sql use test4;
sql create stable st(ts timestamp,a int,b int,c int,d double) tags(ta int,tb int,tc int);
sql create table t1 using st tags(1,1,1);
sql create table t2 using st tags(2,2,2);
sql create stream streams4 trigger at_once ignore expired 0 ignore update 0 into streamt4 as select _wstart, first(a), b, c, ta, tb from st interval(1s);
sleep 1000
sql insert into t1 values(1648791211000,1,2,3,1.0);
sql insert into t1 values(1648791213000,2,3,4,1.1);
sql insert into t2 values(1648791215000,3,4,5,1.1);
sql insert into t2 values(1648791217000,4,5,6,1.1);
$loop_count = 0
loop12:
sleep 200
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
print 1 select * from streamt4 order by 1;
sql select * from streamt4 order by 1;
if $rows != 4 then
print ======rows=$rows
goto loop12
endi
if $data02 != 2 then
print ======data02=$data02
goto loop12
endi
if $data03 != 3 then
print ======data03=$data03
goto loop12
endi
if $data04 != 1 then
print ======data04=$data04
goto loop12
endi
if $data05 != 1 then
print ======data05=$data05
goto loop12
endi
if $data22 != 4 then
print ======data22=$data22
goto loop12
endi
if $data23 != 5 then
print ======data23=$data23
goto loop12
endi
if $data24 != 2 then
print ======data24=$data24
goto loop12
endi
if $data25 != 2 then
print ======data25=$data25
goto loop12
endi
print step3=============
sql create database test5 vgroups 4;
sql use test5;
sql create stable st(ts timestamp,a int,b int,c int,d double) tags(ta int,tb int,tc int);
sql create table t1 using st tags(1,1,1);
sql create table t2 using st tags(2,2,2);
sql create stream streams5 trigger at_once ignore expired 0 ignore update 0 into streamt5 as select _wstart, b, c, ta, tb, max(b) from t1 interval(1s);
sleep 1000
sql insert into t1 values(1648791211000,1,2,3,1.0);
sql insert into t1 values(1648791213000,2,3,4,1.1);
sql insert into t1 values(1648791215000,3,4,5,1.1);
sql insert into t1 values(1648791217000,4,5,6,1.1);
$loop_count = 0
loop13:
sleep 200
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
print 1 select * from streamt5 order by 1;
sql select * from streamt5 order by 1;
if $rows != 4 then
print ======rows=$rows
goto loop13
endi
if $data01 != 2 then
print ======data02=$data02
goto loop13
endi
if $data02 != 3 then
print ======data03=$data03
goto loop13
endi
if $data03 != 1 then
print ======data04=$data04
goto loop13
endi
if $data04 != 1 then
print ======data05=$data05
goto loop13
endi
if $data21 != 4 then
print ======data22=$data22
goto loop13
endi
if $data22 != 5 then
print ======data23=$data23
goto loop13
endi
if $data23 != 1 then
print ======data24=$data24
goto loop13
endi
if $data24 != 1 then
print ======data25=$data25
goto loop13
endi
print step4=============
sql create database test6 vgroups 4;
sql use test6;
sql create stable st(ts timestamp,a int,b int,c int,d double) tags(ta int,tb int,tc int);
sql create table t1 using st tags(1,1,1);
sql create table t2 using st tags(2,2,2);
sql create stream streams6 trigger at_once ignore expired 0 ignore update 0 into streamt6 as select _wstart, b, c,min(c), ta, tb from st interval(1s);
sleep 1000
sql insert into t1 values(1648791211000,1,2,3,1.0);
sql insert into t1 values(1648791213000,2,3,4,1.1);
sql insert into t2 values(1648791215000,3,4,5,1.1);
sql insert into t2 values(1648791217000,4,5,6,1.1);
$loop_count = 0
loop14:
sleep 200
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
print 1 select * from streamt6 order by 1;
sql select * from streamt6 order by 1;
if $rows != 4 then
print ======rows=$rows
goto loop14
endi
if $data01 != 2 then
print ======data02=$data02
goto loop14
endi
if $data02 != 3 then
print ======data03=$data03
goto loop14
endi
if $data04 != 1 then
print ======data04=$data04
goto loop14
endi
if $data05 != 1 then
print ======data05=$data05
goto loop14
endi
if $data21 != 4 then
print ======data22=$data22
goto loop14
endi
if $data22 != 5 then
print ======data23=$data23
goto loop14
endi
if $data24 != 2 then
print ======data24=$data24
goto loop14
endi
if $data25 != 2 then
print ======data25=$data25
goto loop14
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT

View File

@ -240,6 +240,49 @@ class TDTestCase:
self.show_create_sysdb_sql()
self.show_create_systb_sql()
self.show_column_name()
self.test_show_variables()
def get_variable(self, name: str, local: bool = True):
if local:
sql = 'show local variables'
else:
sql = f'select `value` from information_schema.ins_dnode_variables where name like "{name}"'
tdSql.query(sql, queryTimes=1)
res = tdSql.queryResult
if local:
for row in res:
if row[0] == name:
return row[1]
else:
if len(res) > 0:
return res[0][0]
raise Exception(f"variable {name} not found")
def test_show_variables(self):
epsion = 0.0000001
var = 'minimalTmpDirGB'
expect_val: float = 10.11
sql = f'ALTER LOCAL "{var}" "{expect_val}"'
tdSql.execute(sql)
val: float = float(self.get_variable(var))
if val != expect_val:
tdLog.exit(f'failed to set local {var} to {expect_val} actually {val}')
error_vals = ['a', '10a', '', '1.100r', '1.12 r']
for error_val in error_vals:
tdSql.error(f'ALTER LOCAL "{var}" "{error_val}"')
var = 'supportVnodes'
expect_val = 1240 ## 1.211111 * 1024
sql = f'ALTER DNODE 1 "{var}" "1.211111k"'
tdSql.execute(sql, queryTimes=1)
val = int(self.get_variable(var, False))
if val != expect_val:
tdLog.exit(f'failed to set dnode {var} to {expect_val} actually {val}')
error_vals = ['a', '10a', '', '1.100r', '1.12 r', '5k']
for error_val in error_vals:
tdSql.error(f'ALTER DNODE 1 "{var}" "{error_val}"')
def stop(self):
tdSql.close()

View File

@ -79,6 +79,10 @@ class TDTestCase:
tdSql.query(f"select cast(c1 as binary(32)) as b from {self.dbname}.t1")
for i in range(len(data_t1_c1)):
tdSql.checkData( i, 0, str(data_t1_c1[i]) )
tdSql.query(f"select cast(c1 as binary) as b from {self.dbname}.t1")
for i in range(len(data_t1_c1)):
tdSql.checkData( i, 0, str(data_t1_c1[i]) )
tdLog.printNoPrefix("==========step6: cast int to nchar, expect changes to str(int) ")
@ -130,6 +134,13 @@ class TDTestCase:
tdSql.query(f"select cast(c2 as binary(32)) as b from {self.dbname}.t1")
for i in range(len(data_t1_c2)):
tdSql.checkData( i, 0, str(data_t1_c2[i]) )
tdSql.query(f"select cast(c2 as binary) as b from {self.dbname}.ct4")
for i in range(len(data_ct4_c2)):
tdSql.checkData( i, 0, str(data_ct4_c2[i]) )
tdSql.query(f"select cast(c2 as binary) as b from {self.dbname}.t1")
for i in range(len(data_t1_c2)):
tdSql.checkData( i, 0, str(data_t1_c2[i]) )
tdLog.printNoPrefix("==========step10: cast bigint to nchar, expect changes to str(int) ")
@ -184,6 +195,13 @@ class TDTestCase:
tdSql.query(f"select cast(c3 as binary(32)) as b from {self.dbname}.t1")
for i in range(len(data_t1_c3)):
tdSql.checkData( i, 0, str(data_t1_c3[i]) )
tdSql.query(f"select cast(c3 as binary) as b from {self.dbname}.ct4")
for i in range(len(data_ct4_c3)):
tdSql.checkData( i, 0, str(data_ct4_c3[i]) )
tdSql.query(f"select cast(c3 as binary) as b from {self.dbname}.t1")
for i in range(len(data_t1_c3)):
tdSql.checkData( i, 0, str(data_t1_c3[i]) )
tdLog.printNoPrefix("==========step14: cast smallint to nchar, expect changes to str(int) ")
@ -235,6 +253,13 @@ class TDTestCase:
tdSql.query(f"select cast(c4 as binary(32)) as b from {self.dbname}.t1")
for i in range(len(data_t1_c4)):
tdSql.checkData( i, 0, str(data_t1_c4[i]) )
tdSql.query(f"select cast(c4 as binary) as b from {self.dbname}.ct4")
for i in range(len(data_ct4_c4)):
tdSql.checkData( i, 0, str(data_ct4_c4[i]) )
tdSql.query(f"select cast(c4 as binary) as b from {self.dbname}.t1")
for i in range(len(data_t1_c4)):
tdSql.checkData( i, 0, str(data_t1_c4[i]) )
tdLog.printNoPrefix("==========step18: cast tinyint to nchar, expect changes to str(int) ")
@ -282,6 +307,12 @@ class TDTestCase:
for i in range(len(data_ct4_c5)):
tdSql.checkData( i, 0, str(data_ct4_c5[i]) ) if data_ct4_c5[i] is None else tdSql.checkData( i, 0, f'{data_ct4_c5[i]:.6f}' )
tdSql.query(f"select cast(c5 as binary(32)) as b from {self.dbname}.t1")
for i in range(len(data_t1_c5)):
tdSql.checkData( i, 0, str(data_t1_c5[i]) ) if data_t1_c5[i] is None else tdSql.checkData( i, 0, f'{data_t1_c5[i]:.6f}' )
tdSql.query(f"select cast(c5 as binary) as b from {self.dbname}.ct4")
for i in range(len(data_ct4_c5)):
tdSql.checkData( i, 0, str(data_ct4_c5[i]) ) if data_ct4_c5[i] is None else tdSql.checkData( i, 0, f'{data_ct4_c5[i]:.6f}' )
tdSql.query(f"select cast(c5 as binary) as b from {self.dbname}.t1")
for i in range(len(data_t1_c5)):
tdSql.checkData( i, 0, str(data_t1_c5[i]) ) if data_t1_c5[i] is None else tdSql.checkData( i, 0, f'{data_t1_c5[i]:.6f}' )
@ -290,6 +321,12 @@ class TDTestCase:
for i in range(len(data_ct4_c5)):
tdSql.checkData( i, 0, None ) if data_ct4_c5[i] is None else tdSql.checkData( i, 0, f'{data_ct4_c5[i]:.6f}' )
tdSql.query(f"select cast(c5 as nchar(32)) as b from {self.dbname}.t1")
for i in range(len(data_t1_c5)):
tdSql.checkData( i, 0, None ) if data_t1_c5[i] is None else tdSql.checkData( i, 0, f'{data_t1_c5[i]:.6f}' )
tdSql.query(f"select cast(c5 as nchar) as b from {self.dbname}.t1")
for i in range(len(data_t1_c5)):
tdSql.checkData( i, 0, None ) if data_t1_c5[i] is None else tdSql.checkData( i, 0, f'{data_t1_c5[i]:.6f}' )
tdSql.query(f"select cast(c5 as varchar) as b from {self.dbname}.t1")
for i in range(len(data_t1_c5)):
tdSql.checkData( i, 0, None ) if data_t1_c5[i] is None else tdSql.checkData( i, 0, f'{data_t1_c5[i]:.6f}' )
@ -580,6 +617,10 @@ class TDTestCase:
( tdSql.checkData(i, 0, '12121.233231') for i in range(tdSql.queryRows) )
tdSql.query(f"select cast(12121.23323131 + 'test~!@`#$%^&*(){'}'}{'{'}][;><.,' as binary(2)) as b from {self.dbname}.ct4")
( tdSql.checkData(i, 0, '12') for i in range(tdSql.queryRows) )
tdSql.query(f"select cast(12121.23323131 + 'test~!@`#$%^&*(){'}'}{'{'}][;><.,' as binary) as b from {self.dbname}.ct4")
( tdSql.checkData(i, 0, '12121.233231') for i in range(tdSql.queryRows) )
tdSql.query(f"select cast(12121.23323131 + 'test~!@`#$%^&*(){'}'}{'{'}][;><.,' as binary) as b from {self.dbname}.ct4")
( tdSql.checkData(i, 0, '12') for i in range(tdSql.queryRows) )
tdSql.query(f"select cast(12121.23323131 + 'test~!@`#$%^&*(){'}'}{'{'}][;><.,' as nchar(16)) as b from {self.dbname}.ct4")
( tdSql.checkData(i, 0, '12121.233231') for i in range(tdSql.queryRows) )
tdSql.query(f"select cast(12121.23323131 + 'test~!@`#$%^&*(){'}'}{'{'}][;><.,' as nchar(2)) as b from {self.dbname}.ct4")

View File

@ -103,6 +103,10 @@ class TDTestCase:
tdSql.checkRows(row)
tdSql.query(f'select {function_name}(c1),sum(c1) from {self.stbname} partition by tbname')
tdSql.checkRows(row)
tdSql.query(f'select t0, {function_name}(c1),sum(c1) from {self.stbname} partition by tbname')
tdSql.checkRows(row)
tdSql.query(f'select cast(t0 as binary(12)), {function_name}(c1),sum(c1) from {self.stbname} partition by tbname')
tdSql.checkRows(row)
tdSql.query(f'select {function_name}(c1),sum(c1) from {self.stbname} partition by c1')
tdSql.checkRows(0)
tdSql.query(f'select {function_name}(c1),sum(c1) from {self.stbname} partition by t0')

View File

@ -470,7 +470,9 @@ class TDTestCase:
tdSql.checkRows(40)
# bug need fix
tdSql.query("select tbname , csum(c1), csum(c12) from db.stb1 partition by tbname")
tdSql.query("select tbname , st1, csum(c1), csum(c12) from db.stb1 partition by tbname")
tdSql.checkRows(40)
tdSql.query("select tbname , cast(st1 as binary(24)), csum(c1), csum(c12) from db.stb1 partition by tbname")
tdSql.checkRows(40)
tdSql.query("select tbname , csum(st1) from db.stb1 partition by tbname")
tdSql.checkRows(70)

View File

@ -91,15 +91,71 @@ class TDTestCase:
tdSql.query(f"select t2, t3, c1, count(*) from {self.dbname}.{self.stable} {keyword} by t2, t3, c1 ")
tdSql.checkRows(nonempty_tb_num * self.row_nums)
def test_groupby_sub_table(self):
for i in range(self.tb_nums):
tbname = f"{self.dbname}.sub_{self.stable}_{i}"
ts = self.ts + i*10000
tdSql.query(f"select t1, t2, t3,count(*) from {tbname}")
tdSql.checkRows(1)
tdSql.checkData(0, 1, i)
tdSql.checkData(0, 2, i*10)
tdSql.query(f"select cast(t2 as binary(12)),count(*) from {tbname}")
tdSql.checkRows(1)
tdSql.checkData(0, 0, i)
tdSql.query(f"select t2 + 1, count(*) from {tbname}")
tdSql.checkRows(1)
tdSql.checkData(0, 0, i + 1)
tdSql.query(f"select t1, t2, t3, count(*) from {tbname} group by tbname")
tdSql.checkRows(1)
tdSql.checkData(0, 1, i)
tdSql.checkData(0, 2, i*10)
tdSql.query(f"select t1, t2, t3, count(*) from {tbname} group by tbname, c1, t4")
tdSql.checkData(0, 1, i)
tdSql.checkData(0, 2, i*10)
tdSql.query(f"select t1, t2, t3, count(*) from {tbname} partition by tbname")
tdSql.checkRows(1)
tdSql.checkData(0, 1, i)
tdSql.checkData(0, 2, i*10)
tdSql.query(f"select t1, t2, t3, count(*) from {tbname} partition by c1, tbname")
tdSql.checkData(0, 1, i)
tdSql.checkData(0, 2, i*10)
tdSql.query(f"select t1, t2, t3, count(*) from {self.dbname}.{self.stable} partition by c1, tbname order by tbname desc")
tdSql.checkRows(50)
tdSql.checkData(0, 1, 4)
tdSql.checkData(0, 2, 40)
def test_multi_group_key(self, check_num, nonempty_tb_num):
# multi tag/tbname
tdSql.query(f"select t2, t3, tbname, count(*) from {self.dbname}.{self.stable} group by t2, t3, tbname")
tdSql.checkRows(check_num)
tdSql.query(f"select cast(t2 as binary(12)), count(*) from {self.dbname}.{self.stable} group by t2, t3, tbname")
tdSql.checkRows(check_num)
tdSql.query(f"select t2, t3, tbname, count(*) from {self.dbname}.{self.stable} partition by t2, t3, tbname")
tdSql.checkRows(check_num)
tdSql.query(f"select t2, t3, tbname, count(*) from {self.dbname}.{self.stable} group by tbname order by tbname asc")
tdSql.checkRows(check_num)
tdSql.checkData(0, 0, 0)
tdSql.checkData(1, 0, 1)
tdSql.checkData(2, 1, 20)
tdSql.checkData(3, 1, 30)
tdSql.query(f"select t2, t3, tbname, count(*) from {self.dbname}.{self.stable} partition by tbname order by tbname asc")
tdSql.checkRows(check_num)
tdSql.checkData(0, 0, 0)
tdSql.checkData(2, 1, 20)
tdSql.checkData(3, 1, 30)
# multi tag + col
tdSql.query(f"select t1, t2, c1, count(*) from {self.dbname}.{self.stable} partition by t1, t2, c1 ")
tdSql.checkRows(nonempty_tb_num * self.row_nums)
@ -222,12 +278,14 @@ class TDTestCase:
self.test_groupby('group', self.tb_nums, nonempty_tb_num)
self.test_groupby('partition', self.tb_nums, nonempty_tb_num)
self.test_groupby_sub_table()
self.test_innerSelect(self.tb_nums)
self.test_multi_group_key(self.tb_nums, nonempty_tb_num)
self.test_multi_agg(self.tb_nums, nonempty_tb_num)
self.test_window(nonempty_tb_num)
self.test_event_window(nonempty_tb_num)
## test old version before changed
# self.test_groupby('group', 0, 0)
# self.insert_db(5, self.row_nums)

View File

@ -0,0 +1,130 @@
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import random
import string
import sys
import taos
from util.common import *
from util.log import *
from util.cases import *
from util.sql import *
import numpy as np
class TDTestCase:
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor())
self.rowNum = 10
self.tbnum = 20
self.ts = 1537146000000
self.binary_str = 'taosdata'
self.nchar_str = '涛思数据'
def first_check_base(self):
dbname = "db"
tdSql.prepare(dbname)
column_dict = {
'col1': 'tinyint',
'col2': 'smallint',
'col3': 'int',
'col4': 'bigint',
'col5': 'tinyint unsigned',
'col6': 'smallint unsigned',
'col7': 'int unsigned',
'col8': 'bigint unsigned',
'col9': 'float',
'col10': 'double',
'col11': 'bool',
'col12': 'binary(20)',
'col13': 'nchar(20)'
}
tdSql.execute(f"alter local \'keepColumnName\' \'1\'")
tdSql.execute(f'''create table {dbname}.stb(ts timestamp, col1 tinyint, col2 smallint, col3 int, col4 bigint, col5 tinyint unsigned, col6 smallint unsigned,
col7 int unsigned, col8 bigint unsigned, col9 float, col10 double, col11 bool, col12 binary(20), col13 nchar(20)) tags(loc nchar(20))''')
tdSql.execute(f"create table {dbname}.stb_1 using {dbname}.stb tags('beijing')")
tdSql.execute(f"create table {dbname}.stb_2 using {dbname}.stb tags('beijing')")
column_list = ['col1','col2','col3','col4','col5','col6','col7','col8','col9','col10','col11','col12','col13']
for i in column_list:
for j in ['stb_1']:
tdSql.query(f"select first({i}) from {dbname}.{j}")
tdSql.checkRows(0)
for n in range(self.rowNum):
i = n
tdSql.execute(f"insert into {dbname}.stb_1 values(%d, %d, %d, %d, %d, %d, %d, %d, %d, %f, %f, %d, '{self.binary_str}%d', '{self.nchar_str}%d')"
% (self.ts + i, i + 1, i + 1, i + 1, i + 1, i + 1, i + 1, i + 1, i + 1, i + 0.1, i + 0.1, i % 2, i + 1, i + 1))
for n in range(self.rowNum):
i = n + 10
tdSql.execute(f"insert into {dbname}.stb_1 values(%d, %d, %d, %d, %d, %d, %d, %d, %d, %f, %f, %d, '{self.binary_str}%d', '{self.nchar_str}%d')"
% (self.ts + i, i + 1, i + 1, i + 1, i + 1, i + 1, i + 1, i + 1, i + 1, i + 0.1, i + 0.1, i % 2, i + 1, i + 1))
for n in range(self.rowNum):
i = n + 100
tdSql.execute(f"insert into {dbname}.stb_2 values(%d, %d, %d, %d, %d, %d, %d, %d, %d, %f, %f, %d, '{self.binary_str}%d', '{self.nchar_str}%d')"
% (self.ts + i, i + 1, i + 1, i + 1, i + 1, i + 1, i + 1, i + 1, i + 1, i + 0.1, i + 0.1, i % 2, i + 1, i + 1))
for k, v in column_dict.items():
if v == 'tinyint' or v == 'smallint' or v == 'int' or v == 'bigint' or v == 'tinyint unsigned' or v == 'smallint unsigned'\
or v == 'int unsigned' or v == 'bigint unsigned':
tdSql.query(f"select last({k})-first({k}) from {dbname}.stb")
tdSql.checkData(0, 0, 109)
tdSql.query(f"select first({k})+last({k}) from {dbname}.stb")
tdSql.checkData(0, 0, 111)
tdSql.query(f"select max({k})-first({k}) from {dbname}.stb")
tdSql.checkData(0, 0, 109)
tdSql.query(f"select max({k})-min({k}) from {dbname}.stb")
tdSql.checkData(0, 0, 109)
tdSql.query(f"select last({k})-first({k}) from {dbname}.stb_1")
tdSql.checkData(0, 0, 19)
tdSql.query(f"select first({k})+last({k}) from {dbname}.stb_1")
tdSql.checkData(0, 0, 21)
tdSql.query(f"select max({k})-first({k}) from {dbname}.stb_1")
tdSql.checkData(0, 0, 19)
tdSql.query(f"select max({k})-min({k}) from {dbname}.stb_1")
tdSql.checkData(0, 0, 19)
# float,double
elif v == 'float' or v == 'double':
tdSql.query(f"select first({k})+last({k}) from {dbname}.stb")
tdSql.checkData(0, 0, 109.2)
tdSql.query(f"select first({k})+last({k}) from {dbname}.stb_1")
tdSql.checkData(0, 0, 19.2)
# bool
elif v == 'bool':
continue
# binary
elif 'binary' in v:
continue
# nchar
elif 'nchar' in v:
continue
#tdSql.execute(f'drop database {dbname}')
def run(self):
self.first_check_base()
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())