This commit is contained in:
54liuyao 2024-01-19 10:39:11 +08:00
commit 5a7d72d6ef
59 changed files with 704 additions and 405 deletions

View File

@ -226,9 +226,10 @@ Please note the `taoskeeper` needs to be installed and running to create the `lo
| Attribute | Description |
| ------------- | --------------------------------------------------------------------------------------------------------------- |
| Applicable | Client only |
| Meaning | When the Last, First, LastRow function is queried, whether the returned column name contains the function name. |
| Value Range | 0 means including the function name, 1 means not including the function name. |
| Default Value | 0 |
| Meaning | When the Last, First, and LastRow functions are queried and no alias is specified, the alias is automatically set to the column name (excluding the function name). Therefore, if the order by clause refers to the column name, it will automatically refer to the function corresponding to the column. |
| Value Range | 1 means automatically setting the alias to the column name (excluding the function name), 0 means not automatically setting the alias. |
| Default Value | 0 |
| Notes | When multiple of the above functions act on the same column at the same time and no alias is specified, if the order by clause refers to the column name, column selection ambiguous will occur because the aliases of multiple columns are the same. |
## Locale Parameters

View File

@ -215,9 +215,10 @@ taos -C
| 属性 | 说明 |
| -------- | ----------------------------------------------------------- |
| 适用范围 | 仅客户端适用 |
| 含义 | Last、First、LastRow 函数查询时,返回的列名是否包含函数名。 |
| 取值范围 | 0 表示包含函数名1 表示不包含函数名。 |
| 缺省值 | 0 |
| 含义 | Last、First、LastRow 函数查询且未指定别名时,自动设置别名为列名(不含函数名),因此 order by 子句如果引用了该列名将自动引用该列对应的函数 |
| 取值范围 | 1 表示自动设置别名为列名(不包含函数名), 0 表示不自动设置别名。 |
| 缺省值 | 0 |
| 补充说明 | 当同时出现多个上述函数作用于同一列且未指定别名时,如果 order by 子句引用了该列名,将会因为多列别名相同引发列选择冲突|
### countAlwaysReturnValue

View File

@ -32,8 +32,9 @@ int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, SMsgCb* cb, int64_t sve
bool isLeader, bool restored);
int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen);
int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLeader);
int32_t tqStreamTaskResetStatus(SStreamMeta* pMeta, int32_t* numOfTasks);
int32_t tqStreamTaskResetStatus(SStreamMeta* pMeta);
int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta);
int32_t tqStreamTasksGetTotalNum(SStreamMeta* pMeta);
int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, SRpcMsg* pMsg);
int32_t tqStreamTaskProcessTaskPauseReq(SStreamMeta* pMeta, char* pMsg);
int32_t tqStreamTaskProcessTaskResumeReq(void* handle, int64_t sversion, char* pMsg, bool fromVnode);

View File

@ -249,6 +249,10 @@ typedef struct SPoint {
int32_t taosGetLinearInterpolationVal(SPoint *point, int32_t outputType, SPoint *point1, SPoint *point2,
int32_t inputType);
#define LEASTSQUARES_DOUBLE_ITEM_LENGTH 25
#define LEASTSQUARES_BUFF_LENGTH 128
#define DOUBLE_PRECISION_DIGITS "16e"
#ifdef __cplusplus
}
#endif

View File

@ -156,6 +156,7 @@ typedef struct SProjectLogicNode {
SNodeList* pProjections;
char stmtName[TSDB_TABLE_NAME_LEN];
bool ignoreGroupId;
bool inputIgnoreGroup;
} SProjectLogicNode;
typedef struct SIndefRowsFuncLogicNode {
@ -451,6 +452,7 @@ typedef struct SProjectPhysiNode {
SNodeList* pProjections;
bool mergeDataBlock;
bool ignoreGroupId;
bool inputIgnoreGroup;
} SProjectPhysiNode;
typedef struct SIndefRowsFuncPhysiNode {

View File

@ -690,9 +690,9 @@ typedef struct STaskStatusEntry {
int64_t verStart; // start version in WAL, only valid for source task
int64_t verEnd; // end version in WAL, only valid for source task
int64_t processedVer; // only valid for source task
int64_t activeCheckpointId; // current active checkpoint id
int64_t checkpointId; // current active checkpoint id
int32_t chkpointTransId; // checkpoint trans id
bool checkpointFailed; // denote if the checkpoint is failed or not
int8_t checkpointFailed; // denote if the checkpoint is failed or not
bool inputQChanging; // inputQ is changing or not
int64_t inputQUnchangeCounter;
double inputQUsed; // in MiB
@ -875,6 +875,8 @@ void streamMetaStartHb(SStreamMeta* pMeta);
bool streamMetaTaskInTimer(SStreamMeta* pMeta);
int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, int64_t startTs,
int64_t endTs, bool ready);
int32_t streamMetaResetTaskStatus(SStreamMeta* pMeta);
void streamMetaRLock(SStreamMeta* pMeta);
void streamMetaRUnLock(SStreamMeta* pMeta);
void streamMetaWLock(SStreamMeta* pMeta);
@ -887,6 +889,7 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta);
int32_t streamMetaStopAllTasks(SStreamMeta* pMeta);
int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId);
bool streamMetaAllTasksReady(const SStreamMeta* pMeta);
tmr_h streamTimerGetInstance();
// checkpoint
int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSourceReq* pReq);

View File

@ -744,6 +744,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_PAR_INVALID_VIEW_QUERY TAOS_DEF_ERROR_CODE(0, 0x266C)
#define TSDB_CODE_PAR_COL_QUERY_MISMATCH TAOS_DEF_ERROR_CODE(0, 0x266D)
#define TSDB_CODE_PAR_VIEW_CONFLICT_WITH_TABLE TAOS_DEF_ERROR_CODE(0, 0x266E)
#define TSDB_CODE_PAR_ORDERBY_AMBIGUOUS TAOS_DEF_ERROR_CODE(0, 0x266F)
#define TSDB_CODE_PAR_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x26FF)
//planner

View File

@ -2374,7 +2374,7 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange
sdbRelease(pSdb, pStream);
if (conflict) {
mWarn("nodeUpdate trans in progress, current nodeUpdate ignored");
mError("nodeUpdate conflict with other trans, current nodeUpdate ignored");
sdbCancelFetch(pSdb, pIter);
return -1;
}
@ -2587,14 +2587,22 @@ int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot) {
// kill all trans in the dst DB
static void killAllCheckpointTrans(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo) {
mDebug("start to clear checkpoints in all Dbs");
void *pIter = NULL;
while ((pIter = taosHashIterate(pChangeInfo->pDBMap, pIter)) != NULL) {
char *pDb = (char *)pIter;
size_t len = 0;
void *pKey = taosHashGetKey(pDb, &len);
char *p = strndup(pKey, len);
mDebug("clear checkpoint trans in Db:%s", p);
doKillCheckpointTrans(pMnode, pKey, len);
taosMemoryFree(p);
}
mDebug("complete clear checkpoints in Dbs");
}
// this function runs by only one thread, so it is not multi-thread safe
@ -2649,7 +2657,7 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
execInfo.pNodeList = pNodeSnapshot;
execInfo.ts = ts;
} else {
mDebug("unexpect code during create nodeUpdate trans, code:%s", tstrerror(code));
mError("unexpected code during create nodeUpdate trans, code:%s", tstrerror(code));
taosArrayDestroy(pNodeSnapshot);
}
} else {
@ -2844,6 +2852,8 @@ void killTransImpl(SMnode *pMnode, int32_t transId, const char *pDbName) {
mInfo("kill active transId:%d in Db:%s", transId, pDbName);
mndKillTrans(pMnode, pTrans);
mndReleaseTrans(pMnode, pTrans);
} else {
mError("failed to acquire trans in Db:%s, transId:%d", pDbName, transId);
}
}
@ -2869,15 +2879,7 @@ int32_t doKillCheckpointTrans(SMnode *pMnode, const char *pDBName, size_t len) {
static int32_t mndResetStatusFromCheckpoint(SMnode *pMnode, int64_t streamId, int32_t transId) {
int32_t code = TSDB_CODE_SUCCESS;
STrans *pTrans = mndAcquireTrans(pMnode, transId);
if (pTrans != NULL) {
mInfo("kill checkpoint transId:%d to reset task status", transId);
mndKillTrans(pMnode, pTrans);
mndReleaseTrans(pMnode, pTrans);
} else {
mError("failed to acquire checkpoint trans:%d", transId);
}
killTransImpl(pMnode, transId, "");
SStreamObj *pStream = mndGetStreamObj(pMnode, streamId);
if (pStream == NULL) {
@ -3033,7 +3035,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
SStreamHbMsg req = {0};
bool checkpointFailed = false;
int64_t activeCheckpointId = 0;
int64_t checkpointId = 0;
int64_t streamId = 0;
int32_t transId = 0;
@ -3095,14 +3097,17 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
}
streamTaskStatusCopy(pTaskEntry, p);
if (p->activeCheckpointId != 0) {
if (activeCheckpointId != 0) {
ASSERT(activeCheckpointId == p->activeCheckpointId);
if (p->checkpointId != 0) {
if (checkpointId != 0) {
ASSERT(checkpointId == p->checkpointId);
} else {
activeCheckpointId = p->activeCheckpointId;
checkpointId = p->checkpointId;
}
if (p->checkpointFailed) {
mError("stream task:0x%" PRIx64 " checkpointId:%" PRIx64 " transId:%d failed, kill it", p->id.taskId,
p->checkpointId, p->chkpointTransId);
checkpointFailed = p->checkpointFailed;
streamId = p->id.streamId;
transId = p->chkpointTransId;
@ -3124,17 +3129,17 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
// current checkpoint is failed, rollback from the checkpoint trans
// kill the checkpoint trans and then set all tasks status to be normal
if (checkpointFailed && activeCheckpointId != 0) {
if (checkpointFailed && checkpointId != 0) {
bool allReady = true;
SArray *p = mndTakeVgroupSnapshot(pMnode, &allReady);
taosArrayDestroy(p);
if (allReady || snodeChanged) {
// if the execInfo.activeCheckpoint == 0, the checkpoint is restoring from wal
mInfo("checkpointId:%" PRId64 " failed, issue task-reset trans to reset all tasks status", activeCheckpointId);
mInfo("checkpointId:%" PRId64 " failed, issue task-reset trans to reset all tasks status", checkpointId);
mndResetStatusFromCheckpoint(pMnode, streamId, transId);
} else {
mInfo("not all vgroups are ready, wait for next HB from stream tasks");
mInfo("not all vgroups are ready, wait for next HB from stream tasks to reset the task status");
}
}
@ -3148,6 +3153,7 @@ void freeCheckpointCandEntry(void *param) {
SCheckpointCandEntry *pEntry = param;
taosMemoryFreeClear(pEntry->pName);
}
SStreamObj *mndGetStreamObj(SMnode *pMnode, int64_t streamId) {
void *pIter = NULL;
SSdb *pSdb = pMnode->pSdb;

View File

@ -89,7 +89,7 @@ bool mndStreamTransConflictCheck(SMnode* pMnode, int64_t streamUid, const char*
}
if (strcmp(tInfo.name, MND_STREAM_CHECKPOINT_NAME) == 0) {
if (strcmp(pTransName, MND_STREAM_DROP_NAME) != 0) {
if ((strcmp(pTransName, MND_STREAM_DROP_NAME) != 0) && (strcmp(pTransName, MND_STREAM_TASK_RESET_NAME) != 0)) {
mWarn("conflict with other transId:%d streamUid:0x%" PRIx64 ", trans:%s", tInfo.transId, tInfo.streamUid,
tInfo.name);
terrno = TSDB_CODE_MND_TRANS_CONFLICT;

View File

@ -145,8 +145,7 @@ FAIL:
}
int32_t sndInit(SSnode *pSnode) {
int32_t numOfTasks = 0;
tqStreamTaskResetStatus(pSnode->pMeta, &numOfTasks);
streamMetaResetTaskStatus(pSnode->pMeta);
streamMetaStartAllTasks(pSnode->pMeta);
return 0;
}

View File

@ -107,7 +107,6 @@ struct STQ {
TTB* pExecStore;
TTB* pCheckStore;
SStreamMeta* pStreamMeta;
void* tqTimer;
};
int32_t tEncodeSTqHandle(SEncoder* pEncoder, const STqHandle* pHandle);

View File

@ -26,18 +26,6 @@ static FORCE_INLINE bool tqIsHandleExec(STqHandle* pHandle) { return TMQ_HANDLE_
static FORCE_INLINE void tqSetHandleExec(STqHandle* pHandle) { pHandle->status = TMQ_HANDLE_STATUS_EXEC; }
static FORCE_INLINE void tqSetHandleIdle(STqHandle* pHandle) { pHandle->status = TMQ_HANDLE_STATUS_IDLE; }
static int32_t tqTimerInit(STQ* pTq) {
pTq->tqTimer = taosTmrInit(100, 100, 1000, "TQ");
if (pTq->tqTimer == NULL) {
return -1;
}
return 0;
}
static void tqTimerCleanUp(STQ* pTq) {
taosTmrCleanUp(pTq->tqTimer);
}
void tqDestroyTqHandle(void* data) {
STqHandle* pData = (STqHandle*)data;
qDestroyTask(pData->execHandle.task);
@ -118,7 +106,6 @@ int32_t tqInitialize(STQ* pTq) {
return -1;
}
tqTimerInit(pTq);
return 0;
}
@ -149,7 +136,6 @@ void tqClose(STQ* pTq) {
taosMemoryFree(pTq->path);
tqMetaClose(pTq);
streamMetaClose(pTq->pStreamMeta);
tqTimerCleanUp(pTq);
qDebug("end to close tq");
taosMemoryFree(pTq);

View File

@ -95,10 +95,14 @@ int32_t tqScanWalInFuture(STQ* pTq, int32_t numOfTasks, int32_t idleDuration) {
pParam->pTq = pTq;
pParam->numOfTasks = numOfTasks;
tmr_h pTimer = streamTimerGetInstance();
ASSERT(pTimer);
if (pMeta->scanInfo.scanTimer == NULL) {
pMeta->scanInfo.scanTimer = taosTmrStart(doStartScanWal, idleDuration, pParam, pTq->tqTimer);
pMeta->scanInfo.scanTimer = taosTmrStart(doStartScanWal, idleDuration, pParam, pTimer);
} else {
taosTmrReset(doStartScanWal, idleDuration, pParam, pTq->tqTimer, &pMeta->scanInfo.scanTimer);
taosTmrReset(doStartScanWal, idleDuration, pParam, pTimer, &pMeta->scanInfo.scanTimer);
}
return TSDB_CODE_SUCCESS;
@ -175,11 +179,11 @@ int32_t tqStopStreamTasksAsync(STQ* pTq) {
SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq));
if (pRunReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
tqError("vgId:%d failed to create msg to stop tasks, code:%s", vgId, terrstr());
tqError("vgId:%d failed to create msg to stop tasks async, code:%s", vgId, terrstr());
return -1;
}
tqDebug("vgId:%d create msg to stop tasks", vgId);
tqDebug("vgId:%d create msg to stop all tasks async", vgId);
pRunReq->head.vgId = vgId;
pRunReq->streamId = 0;

View File

@ -103,8 +103,7 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
STaskId id = {.streamId = req.streamId, .taskId = req.taskId};
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
if (ppTask == NULL || *ppTask == NULL) {
tqError("vgId:%d failed to acquire task:0x%x when handling update, it may have been dropped already", vgId,
req.taskId);
tqError("vgId:%d failed to acquire task:0x%x when handling update, it may have been dropped", vgId, req.taskId);
rsp.code = TSDB_CODE_SUCCESS;
streamMetaWUnLock(pMeta);
@ -124,6 +123,7 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
tqDebug("s-task:%s recv trans to update nodeEp from mnode, transId:%d", idstr, req.transId);
}
// duplicate update epset msg received, discard this redundant message
STaskUpdateEntry entry = {.streamId = req.streamId, .taskId = req.taskId, .transId = req.transId};
void* pReqTask = taosHashGet(pMeta->updateInfo.pTasks, &entry, sizeof(STaskUpdateEntry));
@ -135,7 +135,6 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
return rsp.code;
}
// the following two functions should not be executed within the scope of meta lock to avoid deadlock
streamTaskUpdateEpsetInfo(pTask, req.pNodeList);
streamTaskResetStatus(pTask);
@ -159,11 +158,6 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
streamMetaSaveTask(pMeta, *ppHTask);
}
#if 0
if (streamMetaCommit(pMeta) < 0) {
// persist to disk
}
#endif
} else {
tqDebug("s-task:%s vgId:%d not save since restore not finish", idstr, vgId);
}
@ -171,7 +165,7 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
tqDebug("s-task:%s vgId:%d start to stop task after save task", idstr, vgId);
streamTaskStop(pTask);
// keep the already handled info
// keep the already updated info
taosHashPut(pMeta->updateInfo.pTasks, &entry, sizeof(entry), NULL, 0);
if (ppHTask != NULL) {
@ -200,6 +194,10 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
updateTasks, (numOfTasks - updateTasks));
streamMetaWUnLock(pMeta);
} else {
if (streamMetaCommit(pMeta) < 0) {
// persist to disk
}
if (!restored) {
tqDebug("vgId:%d vnode restore not completed, not start the tasks, clear the start after nodeUpdate flag", vgId);
pMeta->startInfo.tasksWillRestart = 0;
@ -686,16 +684,16 @@ int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen
return 0;
}
int32_t tqStreamTaskResetStatus(SStreamMeta* pMeta, int32_t* numOfTasks) {
int32_t tqStreamTaskResetStatus(SStreamMeta* pMeta) {
int32_t vgId = pMeta->vgId;
*numOfTasks = taosArrayGetSize(pMeta->pTaskList);
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
tqDebug("vgId:%d reset all %d stream task(s) status to be uninit", vgId, *numOfTasks);
if (*numOfTasks == 0) {
tqDebug("vgId:%d reset all %d stream task(s) status to be uninit", vgId, numOfTasks);
if (numOfTasks == 0) {
return TSDB_CODE_SUCCESS;
}
for (int32_t i = 0; i < (*numOfTasks); ++i) {
for (int32_t i = 0; i < numOfTasks; ++i) {
SStreamTaskId* pTaskId = taosArrayGet(pMeta->pTaskList, i);
STaskId id = {.streamId = pTaskId->streamId, .taskId = pTaskId->taskId};
@ -754,8 +752,7 @@ static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) {
}
if (isLeader && !tsDisableStream) {
int32_t numOfTasks = 0;
tqStreamTaskResetStatus(pMeta, &numOfTasks);
streamMetaResetTaskStatus(pMeta);
streamMetaWUnLock(pMeta);
streamMetaStartAllTasks(pMeta);
@ -965,8 +962,7 @@ static int32_t tqProcessTaskResumeImpl(void* handle, SStreamTask* pTask, int64_t
} else if (status == TASK_STATUS__UNINIT) {
// todo: fill-history task init ?
if (pTask->info.fillHistory == 0) {
EStreamTaskEvent event = /*HAS_RELATED_FILLHISTORY_TASK(pTask) ? TASK_EVENT_INIT_STREAM_SCANHIST : */TASK_EVENT_INIT;
streamTaskHandleEvent(pTask->status.pSM, event);
streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_INIT);
}
}
@ -990,4 +986,8 @@ int32_t tqStreamTaskProcessTaskResumeReq(void* handle, int64_t sversion, char* m
}
return code;
}
int32_t tqStreamTasksGetTotalNum(SStreamMeta* pMeta) {
return taosArrayGetSize(pMeta->pTaskList);
}

View File

@ -941,7 +941,7 @@ static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArr
}
if(lastrowTmpIndexArray != NULL) {
mergeLastCid(uid, pTsdb, &lastrowTmpColArray, pr, lastrowColIds, lastrowIndex, lastrowSlotIds);
mergeLastRowCid(uid, pTsdb, &lastrowTmpColArray, pr, lastrowColIds, lastrowIndex, lastrowSlotIds);
for(int i = 0; i < taosArrayGetSize(lastrowTmpColArray); i++) {
taosArrayInsert(pTmpColArray, *(int32_t*)taosArrayGet(lastrowTmpIndexArray, i), taosArrayGet(lastrowTmpColArray, i));
}

View File

@ -4120,8 +4120,10 @@ void tsdbReaderClose2(STsdbReader* pReader) {
taosMemoryFreeClear(pReader->status.uidList.tableUidList);
qTrace("tsdb/reader-close: %p, untake snapshot", pReader);
tsdbUntakeReadSnap2(pReader, pReader->pReadSnap, true);
pReader->pReadSnap = NULL;
void* p = pReader->pReadSnap;
if ((p == atomic_val_compare_exchange_ptr((void**)&pReader->pReadSnap, p, NULL)) && (p != NULL)) {
tsdbUntakeReadSnap2(pReader, p, true);
}
tsem_destroy(&pReader->resumeAfterSuspend);
tsdbReleaseReader(pReader);
@ -4195,8 +4197,12 @@ int32_t tsdbReaderSuspend2(STsdbReader* pReader) {
doSuspendCurrentReader(pReader);
}
tsdbUntakeReadSnap2(pReader, pReader->pReadSnap, false);
pReader->pReadSnap = NULL;
// make sure only release once
void* p = pReader->pReadSnap;
if ((p == atomic_val_compare_exchange_ptr((void**)&pReader->pReadSnap, p, NULL)) && (p != NULL)) {
tsdbUntakeReadSnap2(pReader, p, false);
}
if (pReader->bFilesetDelimited) {
pReader->status.memTableMinKey = INT64_MAX;
pReader->status.memTableMaxKey = INT64_MIN;

View File

@ -570,9 +570,7 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx)
vInfo("vgId:%d, sync restore finished, not launch stream tasks, since stream tasks are disabled", vgId);
} else {
vInfo("vgId:%d sync restore finished, start to launch stream task(s)", pVnode->config.vgId);
int32_t numOfTasks = 0;
tqStreamTaskResetStatus(pMeta, &numOfTasks);
int32_t numOfTasks = tqStreamTasksGetTotalNum(pMeta);
if (numOfTasks > 0) {
if (pMeta->startInfo.taskStarting == 1) {
pMeta->startInfo.restartCount += 1;
@ -580,6 +578,7 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx)
pMeta->startInfo.restartCount);
} else {
pMeta->startInfo.taskStarting = 1;
streamMetaWUnLock(pMeta);
tqStreamTaskStartAsync(pMeta, &pVnode->msgCb, false);
return;

View File

@ -200,6 +200,7 @@ typedef struct SExchangeInfo {
uint64_t self;
SLimitInfo limitInfo;
int64_t openedTs; // start exec time stamp, todo: move to SLoadRemoteDataInfo
char* pTaskId;
} SExchangeInfo;
typedef struct SScanInfo {
@ -272,7 +273,8 @@ typedef struct STableScanInfo {
SSampleExecInfo sample; // sample execution info
int32_t tableStartIndex; // current group scan start
int32_t tableEndIndex; // current group scan end
int32_t currentGroupIndex; // current group index of groupOffset
int32_t currentGroupId;
int32_t currentTable;
int8_t scanMode;
int8_t assignBlockUid;
uint8_t countState; // empty table count state

View File

@ -260,14 +260,17 @@ static int32_t initDataSource(int32_t numOfSources, SExchangeInfo* pInfo, const
return TSDB_CODE_SUCCESS;
}
int32_t len = strlen(id) + 1;
pInfo->pTaskId = taosMemoryCalloc(1, len);
strncpy(pInfo->pTaskId, id, len);
for (int32_t i = 0; i < numOfSources; ++i) {
SSourceDataInfo dataInfo = {0};
dataInfo.status = EX_SOURCE_DATA_NOT_READY;
dataInfo.taskId = id;
dataInfo.taskId = pInfo->pTaskId;
dataInfo.index = i;
SSourceDataInfo* pDs = taosArrayPush(pInfo->pSourceDataInfo, &dataInfo);
if (pDs == NULL) {
taosArrayDestroy(pInfo->pSourceDataInfo);
taosArrayDestroyEx(pInfo->pSourceDataInfo, freeSourceDataInfo);
return TSDB_CODE_OUT_OF_MEMORY;
}
}
@ -383,6 +386,8 @@ void doDestroyExchangeOperatorInfo(void* param) {
tSimpleHashCleanup(pExInfo->pHashSources);
tsem_destroy(&pExInfo->ready);
taosMemoryFreeClear(pExInfo->pTaskId);
taosMemoryFreeClear(param);
}
@ -782,7 +787,7 @@ int32_t addSingleExchangeSource(SOperatorInfo* pOperator, SExchangeOperatorBasic
if (pIdx->inUseIdx < 0) {
SSourceDataInfo dataInfo = {0};
dataInfo.status = EX_SOURCE_DATA_NOT_READY;
dataInfo.taskId = GET_TASKID(pOperator->pTaskInfo);
dataInfo.taskId = pExchangeInfo->pTaskId;
dataInfo.index = pIdx->srcIdx;
dataInfo.pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
dataInfo.srcOpType = pBasicParam->srcOpType;

View File

@ -646,10 +646,6 @@ int32_t getColInfoResultForGroupby(void* pVnode, SNodeList* group, STableListInf
}
}
if (initRemainGroups) {
pTableListInfo->numOfOuputGroups = taosHashGetSize(pTableListInfo->remainGroups);
}
if (tsTagFilterCache) {
tableList = taosArrayDup(pTableListInfo->pTableList, NULL);
pAPI->metaFn.metaPutTbGroupToCache(pVnode, pTableListInfo->idInfo.suid, context.digest, tListLen(context.digest),
@ -2142,6 +2138,8 @@ int32_t buildGroupIdMapForAllTables(STableListInfo* pTableListInfo, SReadHandle*
pTableListInfo->numOfOuputGroups = numOfTables;
} else if (groupByTbname && pScanNode->groupOrderScan){
pTableListInfo->numOfOuputGroups = numOfTables;
} else if (groupByTbname && tsCountAlwaysReturnValue && ((STableScanPhysiNode*)pScanNode)->needCountEmptyTable) {
pTableListInfo->numOfOuputGroups = numOfTables;
} else {
pTableListInfo->numOfOuputGroups = 1;
}
@ -2159,6 +2157,8 @@ int32_t buildGroupIdMapForAllTables(STableListInfo* pTableListInfo, SReadHandle*
return code;
}
if (pScanNode->groupOrderScan) pTableListInfo->numOfOuputGroups = taosArrayGetSize(pTableListInfo->pTableList);
if (groupSort || pScanNode->groupOrderScan) {
code = sortTableGroup(pTableListInfo);
}

View File

@ -1264,7 +1264,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
STableKeyInfo* pTableInfo = tableListGetInfo(pTableListInfo, 0);
uid = pTableInfo->uid;
ts = INT64_MIN;
pScanInfo->tableEndIndex = 0;
pScanInfo->currentTable = 0;
} else {
taosRUnLockLatch(&pTaskInfo->lock);
qError("no table in table list, %s", id);
@ -1278,16 +1278,16 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
pInfo->pTableScanOp->resultInfo.totalRows = 0;
// start from current accessed position
// we cannot start from the pScanInfo->tableEndIndex, since the commit offset may cause the rollback of the start
// we cannot start from the pScanInfo->currentTable, since the commit offset may cause the rollback of the start
// position, let's find it from the beginning.
index = tableListFind(pTableListInfo, uid, 0);
taosRUnLockLatch(&pTaskInfo->lock);
if (index >= 0) {
pScanInfo->tableEndIndex = index;
pScanInfo->currentTable = index;
} else {
qError("vgId:%d uid:%" PRIu64 " not found in table list, total:%d, index:%d %s", pTaskInfo->id.vgId, uid,
numOfTables, pScanInfo->tableEndIndex, id);
numOfTables, pScanInfo->currentTable, id);
terrno = TSDB_CODE_PAR_INTERNAL_ERROR;
return -1;
}
@ -1310,12 +1310,12 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
}
qDebug("tsdb reader created with offset(snapshot) uid:%" PRId64 " ts:%" PRId64 " table index:%d, total:%d, %s",
uid, pScanBaseInfo->cond.twindows.skey, pScanInfo->tableEndIndex, numOfTables, id);
uid, pScanBaseInfo->cond.twindows.skey, pScanInfo->currentTable, numOfTables, id);
} else {
pTaskInfo->storageAPI.tsdReader.tsdSetQueryTableList(pScanBaseInfo->dataReader, &keyInfo, 1);
pTaskInfo->storageAPI.tsdReader.tsdReaderResetStatus(pScanBaseInfo->dataReader, &pScanBaseInfo->cond);
qDebug("tsdb reader offset seek snapshot to uid:%" PRId64 " ts %" PRId64 " table index:%d numOfTable:%d, %s",
uid, pScanBaseInfo->cond.twindows.skey, pScanInfo->tableEndIndex, numOfTables, id);
uid, pScanBaseInfo->cond.twindows.skey, pScanInfo->currentTable, numOfTables, id);
}
// restore the key value

View File

@ -27,6 +27,7 @@ typedef struct SProjectOperatorInfo {
SLimitInfo limitInfo;
bool mergeDataBlocks;
SSDataBlock* pFinalRes;
bool inputIgnoreGroup;
} SProjectOperatorInfo;
typedef struct SIndefOperatorInfo {
@ -109,7 +110,8 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys
pInfo->pFinalRes = createOneDataBlock(pResBlock, false);
pInfo->binfo.inputTsOrder = pProjPhyNode->node.inputTsOrder;
pInfo->binfo.outputTsOrder = pProjPhyNode->node.outputTsOrder;
pInfo->inputIgnoreGroup = pProjPhyNode->inputIgnoreGroup;
if (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM || pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE) {
pInfo->mergeDataBlocks = false;
} else {
@ -300,6 +302,10 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
return pBlock;
}
if (pProjectInfo->inputIgnoreGroup) {
pBlock->info.id.groupId = 0;
}
int32_t status = discardGroupDataBlock(pBlock, pLimitInfo);
if (status == PROJECT_RETRIEVE_CONTINUE) {
continue;

View File

@ -657,33 +657,17 @@ void setTbNameColData(const SSDataBlock* pBlock, SColumnInfoData* pColInfoData,
static void initNextGroupScan(STableScanInfo* pInfo, STableKeyInfo** pKeyInfo, int32_t* size) {
pInfo->tableStartIndex = pInfo->tableEndIndex + 1;
tableListGetGroupList(pInfo->base.pTableListInfo, pInfo->currentGroupId, pKeyInfo, size);
STableListInfo* pTableListInfo = pInfo->base.pTableListInfo;
int32_t numOfTables = tableListGetSize(pTableListInfo);
STableKeyInfo* pStart = (STableKeyInfo*)tableListGetInfo(pTableListInfo, pInfo->tableStartIndex);
pInfo->tableStartIndex = TARRAY_ELEM_IDX(pInfo->base.pTableListInfo->pTableList, *pKeyInfo);
if (pTableListInfo->oneTableForEachGroup) {
pInfo->tableEndIndex = pInfo->tableStartIndex;
} else if (pTableListInfo->groupOffset) {
pInfo->currentGroupIndex++;
if (pInfo->currentGroupIndex + 1 < pTableListInfo->numOfOuputGroups) {
pInfo->tableEndIndex = pTableListInfo->groupOffset[pInfo->currentGroupIndex + 1] - 1;
} else {
pInfo->tableEndIndex = numOfTables - 1;
}
} else {
pInfo->tableEndIndex = numOfTables - 1;
}
pInfo->tableEndIndex = (pInfo->tableStartIndex + (*size) - 1);
if (!pInfo->needCountEmptyTable) {
pInfo->countState = TABLE_COUNT_STATE_END;
} else {
pInfo->countState = TABLE_COUNT_STATE_SCAN;
}
*pKeyInfo = pStart;
*size = pInfo->tableEndIndex - pInfo->tableStartIndex + 1;
}
void markGroupProcessed(STableScanInfo* pInfo, uint64_t groupId) {
@ -939,7 +923,7 @@ static SSDataBlock* startNextGroupScan(SOperatorInfo* pOperator) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
int32_t numOfTables = tableListGetSize(pInfo->base.pTableListInfo);
if (pInfo->tableEndIndex + 1 >= numOfTables) {
if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pInfo->base.pTableListInfo)) {
setOperatorCompleted(pOperator);
if (pOperator->dynamicTask) {
taosArrayClear(pInfo->base.pTableListInfo->pTableList);
@ -978,13 +962,14 @@ static SSDataBlock* groupSeqTableScan(SOperatorInfo* pOperator) {
int32_t num = 0;
STableKeyInfo* pList = NULL;
if (pInfo->tableEndIndex == -1) {
if (pInfo->currentGroupId == -1) {
int32_t numOfTables = tableListGetSize(pInfo->base.pTableListInfo);
if (pInfo->tableEndIndex + 1 == numOfTables) {
if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pInfo->base.pTableListInfo)) {
setOperatorCompleted(pOperator);
return NULL;
}
initNextGroupScan(pInfo, &pList, &num);
ASSERT(pInfo->base.dataReader == NULL);
@ -1034,7 +1019,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
T_LONG_JMP(pTaskInfo->env, code);
}
if (pOperator->status == OP_EXEC_DONE) {
pInfo->tableEndIndex = -1;
pInfo->currentGroupId = -1;
pOperator->status = OP_OPENED;
SSDataBlock* result = NULL;
while (true) {
@ -1059,23 +1044,23 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
}
// if no data, switch to next table and continue scan
pInfo->tableEndIndex++;
pInfo->currentTable++;
taosRLockLatch(&pTaskInfo->lock);
numOfTables = tableListGetSize(pInfo->base.pTableListInfo);
if (pInfo->tableEndIndex >= numOfTables) {
if (pInfo->currentTable >= numOfTables) {
qDebug("all table checked in table list, total:%d, return NULL, %s", numOfTables, GET_TASKID(pTaskInfo));
taosRUnLockLatch(&pTaskInfo->lock);
return NULL;
}
tInfo = *(STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableEndIndex);
tInfo = *(STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->currentTable);
taosRUnLockLatch(&pTaskInfo->lock);
pAPI->tsdReader.tsdSetQueryTableList(pInfo->base.dataReader, &tInfo, 1);
qDebug("set uid:%" PRIu64 " into scanner, total tables:%d, index:%d/%d %s", tInfo.uid, numOfTables,
pInfo->tableEndIndex, numOfTables, GET_TASKID(pTaskInfo));
pInfo->currentTable, numOfTables, GET_TASKID(pTaskInfo));
pAPI->tsdReader.tsdReaderResetStatus(pInfo->base.dataReader, &pInfo->base.cond);
pInfo->scanTimes = 0;
@ -1167,9 +1152,10 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
pInfo->currentGroupId = -1;
pInfo->tableEndIndex = -1;
pInfo->currentGroupIndex = -1;
pInfo->assignBlockUid = pTableScanNode->assignBlockUid;
pInfo->hasGroupByTag = pTableScanNode->pGroupTags ? true : false;
@ -1268,6 +1254,7 @@ void resetTableScanInfo(STableScanInfo* pTableScanInfo, STimeWindow* pWin, uint6
pTableScanInfo->base.cond.startVersion = 0;
pTableScanInfo->base.cond.endVersion = ver;
pTableScanInfo->scanTimes = 0;
pTableScanInfo->currentGroupId = -1;
pTableScanInfo->tableEndIndex = -1;
pTableScanInfo->base.readerAPI.tsdReaderClose(pTableScanInfo->base.dataReader);
pTableScanInfo->base.dataReader = NULL;
@ -2222,7 +2209,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
pInfo->pTableScanOp->status = OP_OPENED;
pTSInfo->scanTimes = 0;
pTSInfo->tableEndIndex = -1;
pTSInfo->currentGroupId = -1;
}
if (pStreamInfo->recoverStep == STREAM_RECOVER_STEP__SCAN1) {

View File

@ -1,6 +1,17 @@
//
// Created by slzhou on 22-4-20.
//
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_FNLOG_H
#define TDENGINE_FNLOG_H

View File

@ -952,7 +952,7 @@ static int32_t translateLeastSQR(SFunctionNode* pFunc, char* pErrBuf, int32_t le
}
}
pFunc->node.resType = (SDataType){.bytes = 64, .type = TSDB_DATA_TYPE_BINARY};
pFunc->node.resType = (SDataType){.bytes = LEASTSQUARES_BUFF_LENGTH, .type = TSDB_DATA_TYPE_BINARY};
return TSDB_CODE_SUCCESS;
}

View File

@ -1576,9 +1576,19 @@ int32_t leastSQRFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
param12 /= param[1][1];
char buf[512] = {0};
char buf[LEASTSQUARES_BUFF_LENGTH] = {0};
char slopBuf[64] = {0};
char interceptBuf[64] = {0};
int n = snprintf(slopBuf, 64, "%.6lf", param02);
if (n > LEASTSQUARES_DOUBLE_ITEM_LENGTH) {
snprintf(slopBuf, 64, "%." DOUBLE_PRECISION_DIGITS, param02);
}
n = snprintf(interceptBuf, 64, "%.6lf", param12);
if (n > LEASTSQUARES_DOUBLE_ITEM_LENGTH) {
snprintf(interceptBuf, 64, "%." DOUBLE_PRECISION_DIGITS, param12);
}
size_t len =
snprintf(varDataVal(buf), sizeof(buf) - VARSTR_HEADER_SIZE, "{slop:%.6lf, intercept:%.6lf}", param02, param12);
snprintf(varDataVal(buf), sizeof(buf) - VARSTR_HEADER_SIZE, "{slop:%s, intercept:%s}", slopBuf, interceptBuf);
varDataSetLen(buf, len);
colDataSetVal(pCol, currentRow, buf, pResInfo->isNullRes);

View File

@ -492,6 +492,7 @@ static int32_t logicProjectCopy(const SProjectLogicNode* pSrc, SProjectLogicNode
CLONE_NODE_LIST_FIELD(pProjections);
COPY_CHAR_ARRAY_FIELD(stmtName);
COPY_SCALAR_FIELD(ignoreGroupId);
COPY_SCALAR_FIELD(inputIgnoreGroup);
return TSDB_CODE_SUCCESS;
}
@ -735,6 +736,15 @@ static int32_t physiPartitionCopy(const SPartitionPhysiNode* pSrc, SPartitionPhy
return TSDB_CODE_SUCCESS;
}
static int32_t physiProjectCopy(const SProjectPhysiNode* pSrc, SProjectPhysiNode* pDst) {
COPY_BASE_OBJECT_FIELD(node, physiNodeCopy);
CLONE_NODE_LIST_FIELD(pProjections);
COPY_SCALAR_FIELD(mergeDataBlock);
COPY_SCALAR_FIELD(ignoreGroupId);
COPY_SCALAR_FIELD(inputIgnoreGroup);
return TSDB_CODE_SUCCESS;
}
static int32_t dataBlockDescCopy(const SDataBlockDescNode* pSrc, SDataBlockDescNode* pDst) {
COPY_SCALAR_FIELD(dataBlockId);
CLONE_NODE_LIST_FIELD(pSlots);
@ -969,6 +979,9 @@ SNode* nodesCloneNode(const SNode* pNode) {
case QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION:
code = physiPartitionCopy((const SPartitionPhysiNode*)pNode, (SPartitionPhysiNode*)pDst);
break;
case QUERY_NODE_PHYSICAL_PLAN_PROJECT:
code = physiProjectCopy((const SProjectPhysiNode*)pNode, (SProjectPhysiNode*)pDst);
break;
default:
break;
}

View File

@ -790,6 +790,7 @@ static int32_t jsonToLogicScanNode(const SJson* pJson, void* pObj) {
static const char* jkProjectLogicPlanProjections = "Projections";
static const char* jkProjectLogicPlanIgnoreGroupId = "IgnoreGroupId";
static const char* jkProjectLogicPlanInputIgnoreGroup= "InputIgnoreGroup";
static int32_t logicProjectNodeToJson(const void* pObj, SJson* pJson) {
const SProjectLogicNode* pNode = (const SProjectLogicNode*)pObj;
@ -801,6 +802,9 @@ static int32_t logicProjectNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddBoolToObject(pJson, jkProjectLogicPlanIgnoreGroupId, pNode->ignoreGroupId);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddBoolToObject(pJson, jkProjectLogicPlanInputIgnoreGroup, pNode->inputIgnoreGroup);
}
return code;
}
@ -815,7 +819,9 @@ static int32_t jsonToLogicProjectNode(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBoolValue(pJson, jkProjectLogicPlanIgnoreGroupId, &pNode->ignoreGroupId);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBoolValue(pJson, jkProjectLogicPlanInputIgnoreGroup, &pNode->inputIgnoreGroup);
}
return code;
}
@ -2073,6 +2079,7 @@ static int32_t jsonToPhysiSysTableScanNode(const SJson* pJson, void* pObj) {
static const char* jkProjectPhysiPlanProjections = "Projections";
static const char* jkProjectPhysiPlanMergeDataBlock = "MergeDataBlock";
static const char* jkProjectPhysiPlanIgnoreGroupId = "IgnoreGroupId";
static const char* jkProjectPhysiPlanInputIgnoreGroup = "InputIgnoreGroup";
static int32_t physiProjectNodeToJson(const void* pObj, SJson* pJson) {
const SProjectPhysiNode* pNode = (const SProjectPhysiNode*)pObj;
@ -2087,7 +2094,9 @@ static int32_t physiProjectNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddBoolToObject(pJson, jkProjectPhysiPlanIgnoreGroupId, pNode->ignoreGroupId);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddBoolToObject(pJson, jkProjectPhysiPlanInputIgnoreGroup, pNode->inputIgnoreGroup);
}
return code;
}
@ -2104,7 +2113,9 @@ static int32_t jsonToPhysiProjectNode(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBoolValue(pJson, jkProjectPhysiPlanIgnoreGroupId, &pNode->ignoreGroupId);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBoolValue(pJson, jkProjectPhysiPlanInputIgnoreGroup, &pNode->inputIgnoreGroup);
}
return code;
}

View File

@ -2367,7 +2367,8 @@ enum {
PHY_PROJECT_CODE_BASE_NODE = 1,
PHY_PROJECT_CODE_PROJECTIONS,
PHY_PROJECT_CODE_MERGE_DATA_BLOCK,
PHY_PROJECT_CODE_IGNORE_GROUP_ID
PHY_PROJECT_CODE_IGNORE_GROUP_ID,
PHY_PROJECT_CODE_INPUT_IGNORE_GROUP
};
static int32_t physiProjectNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
@ -2383,6 +2384,9 @@ static int32_t physiProjectNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeBool(pEncoder, PHY_PROJECT_CODE_IGNORE_GROUP_ID, pNode->ignoreGroupId);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeBool(pEncoder, PHY_PROJECT_CODE_INPUT_IGNORE_GROUP, pNode->inputIgnoreGroup);
}
return code;
}
@ -2406,6 +2410,9 @@ static int32_t msgToPhysiProjectNode(STlvDecoder* pDecoder, void* pObj) {
case PHY_PROJECT_CODE_IGNORE_GROUP_ID:
code = tlvDecodeBool(pTlv, &pNode->ignoreGroupId);
break;
case PHY_PROJECT_CODE_INPUT_IGNORE_GROUP:
code = tlvDecodeBool(pTlv, &pNode->inputIgnoreGroup);
break;
default:
break;
}

View File

@ -1085,21 +1085,31 @@ static EDealRes translateColumnWithoutPrefix(STranslateContext* pCxt, SColumnNod
static EDealRes translateColumnUseAlias(STranslateContext* pCxt, SColumnNode** pCol, bool* pFound) {
SNodeList* pProjectionList = getProjectListFromCurrStmt(pCxt->pCurrStmt);
SNode* pNode;
SNode* pFoundNode = NULL;
*pFound = false;
FOREACH(pNode, pProjectionList) {
SExprNode* pExpr = (SExprNode*)pNode;
if (0 == strcmp((*pCol)->colName, pExpr->userAlias)) {
SNode* pNew = nodesCloneNode(pNode);
if (NULL == pNew) {
pCxt->errCode = TSDB_CODE_OUT_OF_MEMORY;
return DEAL_RES_ERROR;
if (true == *pFound) {
if(nodesEqualNode(pFoundNode, pNode)) {
continue;
}
nodesDestroyNode(*(SNode**)pCol);
*(SNode**)pCol = (SNode*)pNew;
*pFound = true;
return DEAL_RES_CONTINUE;
}
pCxt->errCode = generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_ORDERBY_AMBIGUOUS, (*pCol)->colName);
return DEAL_RES_ERROR;
}
*pFound = true;
pFoundNode = pNode;
}
}
if (*pFound) {
SNode* pNew = nodesCloneNode(pFoundNode);
if (NULL == pNew) {
pCxt->errCode = TSDB_CODE_OUT_OF_MEMORY;
return DEAL_RES_ERROR;
}
nodesDestroyNode(*(SNode**)pCol);
*(SNode**)pCol = (SNode*)pNew;
}
*pFound = false;
return DEAL_RES_CONTINUE;
}
@ -4510,13 +4520,12 @@ typedef struct SReplaceOrderByAliasCxt {
static EDealRes replaceOrderByAliasImpl(SNode** pNode, void* pContext) {
SReplaceOrderByAliasCxt* pCxt = pContext;
SNodeList* pProjectionList = pCxt->pProjectionList;
SNode* pProject = NULL;
SNodeList* pProjectionList = pCxt->pProjectionList;
SNode* pProject = NULL;
if (QUERY_NODE_COLUMN == nodeType(*pNode)) {
FOREACH(pProject, pProjectionList) {
SExprNode* pExpr = (SExprNode*)pProject;
if (0 == strcmp(((SColumnRefNode*)*pNode)->colName, pExpr->userAlias)
&& nodeType(*pNode) == nodeType(pProject)) {
if (0 == strcmp(((SColumnRefNode*)*pNode)->colName, pExpr->userAlias) && nodeType(*pNode) == nodeType(pProject)) {
SNode* pNew = nodesCloneNode(pProject);
if (NULL == pNew) {
pCxt->pTranslateCxt->errCode = TSDB_CODE_OUT_OF_MEMORY;
@ -4530,14 +4539,14 @@ static EDealRes replaceOrderByAliasImpl(SNode** pNode, void* pContext) {
}
} else if (QUERY_NODE_ORDER_BY_EXPR == nodeType(*pNode)) {
STranslateContext* pTransCxt = pCxt->pTranslateCxt;
SNode* pExpr = ((SOrderByExprNode*)*pNode)->pExpr;
if (QUERY_NODE_VALUE == nodeType(pExpr)) {
SNode* pExpr = ((SOrderByExprNode*)*pNode)->pExpr;
if (QUERY_NODE_VALUE == nodeType(pExpr)) {
SValueNode* pVal = (SValueNode*)pExpr;
if (DEAL_RES_ERROR == translateValue(pTransCxt, pVal)) {
return pTransCxt->errCode;
}
int32_t pos = getPositionValue(pVal);
if ( 0 < pos && pos <= LIST_LENGTH(pProjectionList)) {
if (0 < pos && pos <= LIST_LENGTH(pProjectionList)) {
SNode* pNew = nodesCloneNode(nodesListGetNode(pProjectionList, pos - 1));
if (NULL == pNew) {
pCxt->pTranslateCxt->errCode = TSDB_CODE_OUT_OF_MEMORY;

View File

@ -190,6 +190,8 @@ static char* getSyntaxErrFormat(int32_t errCode) {
return "invalid ip range";
case TSDB_CODE_OUT_OF_MEMORY:
return "Out of memory";
case TSDB_CODE_PAR_ORDERBY_AMBIGUOUS:
return "ORDER BY \"%s\" is ambiguous";
default:
return "Unknown error";
}

View File

@ -3122,6 +3122,7 @@ static bool mergeProjectsMayBeOptimized(SLogicNode* pNode) {
NULL != pChild->pConditions || NULL != pChild->pLimit || NULL != pChild->pSlimit) {
return false;
}
return true;
}
@ -3160,7 +3161,9 @@ static EDealRes mergeProjectionsExpr(SNode** pNode, void* pContext) {
static int32_t mergeProjectsOptimizeImpl(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan, SLogicNode* pSelfNode) {
SLogicNode* pChild = (SLogicNode*)nodesListGetNode(pSelfNode->pChildren, 0);
if (((SProjectLogicNode*)pChild)->ignoreGroupId) {
((SProjectLogicNode*)pSelfNode)->inputIgnoreGroup = true;
}
SMergeProjectionsContext cxt = {.pChildProj = (SProjectLogicNode*)pChild, .errCode = TSDB_CODE_SUCCESS};
nodesRewriteExprs(((SProjectLogicNode*)pSelfNode)->pProjections, mergeProjectionsExpr, &cxt);
int32_t code = cxt.errCode;
@ -3325,7 +3328,11 @@ static bool pushDownLimitTo(SLogicNode* pNodeWithLimit, SLogicNode* pNodeLimitPu
}
case QUERY_NODE_LOGIC_PLAN_SCAN:
if (nodeType(pNodeWithLimit) == QUERY_NODE_LOGIC_PLAN_PROJECT && pNodeWithLimit->pLimit) {
swapLimit(pNodeWithLimit, pNodeLimitPushTo);
if (((SProjectLogicNode*)pNodeWithLimit)->inputIgnoreGroup) {
cloneLimit(pNodeWithLimit, pNodeLimitPushTo, CLONE_LIMIT);
} else {
swapLimit(pNodeWithLimit, pNodeLimitPushTo);
}
return true;
}
default:

View File

@ -1459,6 +1459,7 @@ static int32_t createProjectPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChild
pProject->mergeDataBlock = projectCanMergeDataBlock(pProjectLogicNode);
pProject->ignoreGroupId = pProjectLogicNode->ignoreGroupId;
pProject->inputIgnoreGroup = pProjectLogicNode->inputIgnoreGroup;
int32_t code = TSDB_CODE_SUCCESS;
if (0 == LIST_LENGTH(pChildren)) {

View File

@ -1178,8 +1178,10 @@ static int32_t stbSplSplitSortNode(SSplitContext* pCxt, SStableSplitInfo* pInfo)
static int32_t stbSplGetSplitNodeForScan(SStableSplitInfo* pInfo, SLogicNode** pSplitNode) {
*pSplitNode = pInfo->pSplitNode;
if (NULL != pInfo->pSplitNode->pParent && QUERY_NODE_LOGIC_PLAN_PROJECT == nodeType(pInfo->pSplitNode->pParent) &&
NULL == pInfo->pSplitNode->pParent->pLimit && NULL == pInfo->pSplitNode->pParent->pSlimit) {
if (NULL != pInfo->pSplitNode->pParent &&
QUERY_NODE_LOGIC_PLAN_PROJECT == nodeType(pInfo->pSplitNode->pParent) &&
NULL == pInfo->pSplitNode->pParent->pLimit && NULL == pInfo->pSplitNode->pParent->pSlimit &&
!((SProjectLogicNode*)pInfo->pSplitNode->pParent)->inputIgnoreGroup) {
*pSplitNode = pInfo->pSplitNode->pParent;
if (NULL != pInfo->pSplitNode->pLimit) {
(*pSplitNode)->pLimit = nodesCloneNode(pInfo->pSplitNode->pLimit);

View File

@ -2427,9 +2427,19 @@ int32_t leastSQRScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarPa
matrix12 /= matrix[1][1];
char buf[64] = {0};
size_t len = snprintf(varDataVal(buf), sizeof(buf) - VARSTR_HEADER_SIZE, "{slop:%.6lf, intercept:%.6lf}", matrix02,
matrix12);
char buf[LEASTSQUARES_BUFF_LENGTH] = {0};
char slopBuf[64] = {0};
char interceptBuf[64] = {0};
int n = snprintf(slopBuf, 64, "%.6lf", matrix02);
if (n > LEASTSQUARES_DOUBLE_ITEM_LENGTH) {
snprintf(slopBuf, 64, "%." DOUBLE_PRECISION_DIGITS, matrix02);
}
n = snprintf(interceptBuf, 64, "%.6lf", matrix12);
if (n > LEASTSQUARES_DOUBLE_ITEM_LENGTH) {
snprintf(interceptBuf, 64, "%." DOUBLE_PRECISION_DIGITS, matrix12);
}
size_t len =
snprintf(varDataVal(buf), sizeof(buf) - VARSTR_HEADER_SIZE, "{slop:%s, intercept:%s}", slopBuf, interceptBuf);
varDataSetLen(buf, len);
colDataSetVal(pOutputData, 0, buf, false);
}

View File

@ -48,8 +48,8 @@ extern "C" {
#define stError(...) do { if (stDebugFlag & DEBUG_ERROR) { taosPrintLog("STM ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} while(0)
#define stWarn(...) do { if (stDebugFlag & DEBUG_WARN) { taosPrintLog("STM WARN ", DEBUG_WARN, 255, __VA_ARGS__); }} while(0)
#define stInfo(...) do { if (stDebugFlag & DEBUG_INFO) { taosPrintLog("STM ", DEBUG_INFO, 255, __VA_ARGS__); }} while(0)
#define stDebug(...) do { if (stDebugFlag & DEBUG_DEBUG) { taosPrintLog("STM ", DEBUG_DEBUG, tqDebugFlag, __VA_ARGS__); }} while(0)
#define stTrace(...) do { if (stDebugFlag & DEBUG_TRACE) { taosPrintLog("STM ", DEBUG_TRACE, tqDebugFlag, __VA_ARGS__); }} while(0)
#define stDebug(...) do { if (stDebugFlag & DEBUG_DEBUG) { taosPrintLog("STM ", DEBUG_DEBUG, stDebugFlag, __VA_ARGS__); }} while(0)
#define stTrace(...) do { if (stDebugFlag & DEBUG_TRACE) { taosPrintLog("STM ", DEBUG_TRACE, stDebugFlag, __VA_ARGS__); }} while(0)
// clang-format on
typedef struct {

View File

@ -35,6 +35,10 @@ void streamTimerCleanUp() {
streamTimer = NULL;
}
tmr_h streamTimerGetInstance() {
return streamTimer;
}
char* createStreamTaskIdStr(int64_t streamId, int32_t taskId) {
char buf[128] = {0};
sprintf(buf, "0x%" PRIx64 "-0x%x", streamId, taskId);

View File

@ -500,34 +500,27 @@ int32_t backendCopyFiles(char* src, char* dst) {
// return 0;
}
int32_t rebuildFromLocalChkp(char* key, char* chkpPath, int64_t chkpId, char* defaultPath) {
int32_t code = -1;
int32_t len = strlen(defaultPath) + 32;
char* tmp = taosMemoryCalloc(1, len);
sprintf(tmp, "%s%s", defaultPath, "_tmp");
if (taosIsDir(tmp)) taosRemoveDir(tmp);
if (taosIsDir(defaultPath)) taosRenameFile(defaultPath, tmp);
if (taosIsDir(chkpPath) && isValidCheckpoint(chkpPath)) {
if (taosIsDir(tmp)) {
taosRemoveDir(tmp);
}
int32_t code = 0;
if (taosIsDir(defaultPath)) {
taosRemoveDir(defaultPath);
taosMkDir(defaultPath);
stInfo("succ to clear stream backend %s", defaultPath);
}
if (taosIsDir(chkpPath) && isValidCheckpoint(chkpPath)) {
code = backendCopyFiles(chkpPath, defaultPath);
if (code != 0) {
stError("failed to restart stream backend from %s, reason: %s", chkpPath, tstrerror(TAOS_SYSTEM_ERROR(errno)));
taosRemoveDir(defaultPath);
taosMkDir(defaultPath);
stError("failed to restart stream backend from %s, reason: %s, start to restart from empty path: %s", chkpPath,
tstrerror(TAOS_SYSTEM_ERROR(errno)), defaultPath);
code = 0;
} else {
stInfo("start to restart stream backend at checkpoint path: %s", chkpPath);
}
}
if (code != 0) {
if (taosIsDir(defaultPath)) taosRemoveDir(defaultPath);
if (taosIsDir(tmp)) taosRenameFile(tmp, defaultPath);
} else {
taosRemoveDir(tmp);
}
taosMemoryFree(tmp);
return code;
}

View File

@ -360,6 +360,8 @@ int32_t streamSaveTaskCheckpointInfo(SStreamTask* p, int64_t checkpointId) {
void streamTaskSetCheckpointFailedId(SStreamTask* pTask) {
pTask->chkInfo.failedId = pTask->chkInfo.checkpointingId;
stDebug("s-task:%s mark the checkpointId:%" PRId64 " (transId:%d) failed", pTask->id.idStr,
pTask->chkInfo.checkpointingId, pTask->chkInfo.transId);
}
int32_t getChkpMeta(char* id, char* path, SArray* list) {

View File

@ -960,7 +960,7 @@ int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) {
if (tEncodeI64(pEncoder, ps->processedVer) < 0) return -1;
if (tEncodeI64(pEncoder, ps->verStart) < 0) return -1;
if (tEncodeI64(pEncoder, ps->verEnd) < 0) return -1;
if (tEncodeI64(pEncoder, ps->activeCheckpointId) < 0) return -1;
if (tEncodeI64(pEncoder, ps->checkpointId) < 0) return -1;
if (tEncodeI8(pEncoder, ps->checkpointFailed) < 0) return -1;
if (tEncodeI32(pEncoder, ps->chkpointTransId) < 0) return -1;
}
@ -999,8 +999,8 @@ int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) {
if (tDecodeI64(pDecoder, &entry.processedVer) < 0) return -1;
if (tDecodeI64(pDecoder, &entry.verStart) < 0) return -1;
if (tDecodeI64(pDecoder, &entry.verEnd) < 0) return -1;
if (tDecodeI64(pDecoder, &entry.activeCheckpointId) < 0) return -1;
if (tDecodeI8(pDecoder, (int8_t*)&entry.checkpointFailed) < 0) return -1;
if (tDecodeI64(pDecoder, &entry.checkpointId) < 0) return -1;
if (tDecodeI8(pDecoder, &entry.checkpointFailed) < 0) return -1;
if (tDecodeI32(pDecoder, &entry.chkpointTransId) < 0) return -1;
entry.id.taskId = taskId;
@ -1113,8 +1113,8 @@ static int32_t metaHeartbeatToMnodeImpl(SStreamMeta* pMeta) {
}
if ((*pTask)->chkInfo.checkpointingId != 0) {
entry.checkpointFailed = ((*pTask)->chkInfo.failedId >= (*pTask)->chkInfo.checkpointingId);
entry.activeCheckpointId = (*pTask)->chkInfo.checkpointingId;
entry.checkpointFailed = ((*pTask)->chkInfo.failedId >= (*pTask)->chkInfo.checkpointingId)? 1:0;
entry.checkpointId = (*pTask)->chkInfo.checkpointingId;
entry.chkpointTransId = (*pTask)->chkInfo.transId;
if (entry.checkpointFailed) {
@ -1373,7 +1373,6 @@ SArray* streamMetaSendMsgBeforeCloseTasks(SStreamMeta* pMeta) {
SStreamTaskState* pState = streamTaskGetStatus(pTask);
if (pState->state == TASK_STATUS__CK) {
streamTaskSetCheckpointFailedId(pTask);
stDebug("s-task:%s mark the checkpoint:%"PRId64" failed", pTask->id.idStr, pTask->chkInfo.checkpointingId);
} else {
stDebug("s-task:%s status:%s not reset the checkpoint", pTask->id.idStr, pState->name);
}
@ -1411,31 +1410,43 @@ void streamMetaUpdateStageRole(SStreamMeta* pMeta, int64_t stage, bool isLeader)
}
}
static SArray* prepareBeforeStartTasks(SStreamMeta* pMeta) {
streamMetaWLock(pMeta);
SArray* pTaskList = taosArrayDup(pMeta->pTaskList, NULL);
taosHashClear(pMeta->startInfo.pReadyTaskSet);
taosHashClear(pMeta->startInfo.pFailedTaskSet);
pMeta->startInfo.startTs = taosGetTimestampMs();
streamMetaResetTaskStatus(pMeta);
streamMetaWUnLock(pMeta);
return pTaskList;
}
int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t vgId = pMeta->vgId;
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
stInfo("vgId:%d start to check all %d stream task(s) downstream status", vgId, numOfTasks);
if (numOfTasks == 0) {
stInfo("vgId:%d start tasks completed", pMeta->vgId);
return TSDB_CODE_SUCCESS;
}
SArray* pTaskList = NULL;
streamMetaWLock(pMeta);
pTaskList = taosArrayDup(pMeta->pTaskList, NULL);
taosHashClear(pMeta->startInfo.pReadyTaskSet);
taosHashClear(pMeta->startInfo.pFailedTaskSet);
pMeta->startInfo.startTs = taosGetTimestampMs();
streamMetaWUnLock(pMeta);
// broadcast the check downstream tasks msg
int64_t now = taosGetTimestampMs();
SArray* pTaskList = prepareBeforeStartTasks(pMeta);
numOfTasks = taosArrayGetSize(pTaskList);
// broadcast the check downstream tasks msg
for (int32_t i = 0; i < numOfTasks; ++i) {
SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i);
// todo: may be we should find the related fill-history task and set it failed.
// todo: use hashTable instead
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pTaskId->streamId, pTaskId->taskId);
if (pTask == NULL) {
stError("vgId:%d failed to acquire task:0x%x during start tasks", pMeta->vgId, pTaskId->taskId);
@ -1443,8 +1454,6 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) {
continue;
}
// todo: may be we should find the related fill-history task and set it failed.
// fill-history task can only be launched by related stream tasks.
STaskExecStatisInfo* pInfo = &pTask->execInfo;
if (pTask->info.fillHistory == 1) {
@ -1491,10 +1500,13 @@ int32_t streamMetaStopAllTasks(SStreamMeta* pMeta) {
int32_t num = taosArrayGetSize(pMeta->pTaskList);
stDebug("vgId:%d stop all %d stream task(s)", pMeta->vgId, num);
if (num == 0) {
stDebug("vgId:%d stop all %d task(s) completed, elapsed time:0 Sec.", pMeta->vgId, num);
streamMetaRUnLock(pMeta);
return TSDB_CODE_SUCCESS;
}
int64_t st = taosGetTimestampMs();
// send hb msg to mnode before closing all tasks.
SArray* pTaskList = streamMetaSendMsgBeforeCloseTasks(pMeta);
int32_t numOfTasks = taosArrayGetSize(pTaskList);
@ -1512,6 +1524,9 @@ int32_t streamMetaStopAllTasks(SStreamMeta* pMeta) {
taosArrayDestroy(pTaskList);
double el = (taosGetTimestampMs() - st) / 1000.0;
stDebug("vgId:%d stop all %d task(s) completed, elapsed time:%.2f Sec.", pMeta->vgId, num, el);
streamMetaRUnLock(pMeta);
return 0;
}
@ -1638,4 +1653,23 @@ int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int3
}
return TSDB_CODE_SUCCESS;
}
int32_t streamMetaResetTaskStatus(SStreamMeta* pMeta) {
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
stDebug("vgId:%d reset all %d stream task(s) status to be uninit", pMeta->vgId, numOfTasks);
if (numOfTasks == 0) {
return TSDB_CODE_SUCCESS;
}
for (int32_t i = 0; i < numOfTasks; ++i) {
SStreamTaskId* pTaskId = taosArrayGet(pMeta->pTaskList, i);
STaskId id = {.streamId = pTaskId->streamId, .taskId = pTaskId->taskId};
SStreamTask** pTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
streamTaskResetStatus(*pTask);
}
return 0;
}

View File

@ -810,7 +810,7 @@ void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc)
pDst->verEnd = pSrc->verEnd;
pDst->sinkQuota = pSrc->sinkQuota;
pDst->sinkDataSize = pSrc->sinkDataSize;
pDst->activeCheckpointId = pSrc->activeCheckpointId;
pDst->checkpointId = pSrc->checkpointId;
pDst->checkpointFailed = pSrc->checkpointFailed;
pDst->chkpointTransId = pSrc->chkpointTransId;
}

View File

@ -676,7 +676,7 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak, int64_
// heartbeat timeout
if (syncNodeHeartbeatReplyTimeout(pSyncNode)) {
terrno = TSDB_CODE_SYN_PROPOSE_NOT_READY;
sNError(pSyncNode, "failed to sync propose since hearbeat timeout, type:%s, last:%" PRId64 ", cmt:%" PRId64,
sNError(pSyncNode, "failed to sync propose since heartbeat timeout, type:%s, last:%" PRId64 ", cmt:%" PRId64,
TMSG_INFO(pMsg->msgType), syncNodeGetLastIndex(pSyncNode), pSyncNode->commitIndex);
return -1;
}

View File

@ -345,9 +345,9 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) {
for (int32_t seq = pSndBuf->cursor + 1; seq < pSndBuf->end; ++seq) {
SyncSnapBlock *pBlk = pSndBuf->entries[seq % pSndBuf->size];
ASSERT(pBlk && !pBlk->acked);
ASSERT(pBlk);
int64_t nowMs = taosGetTimestampMs();
if (nowMs < pBlk->sendTimeMs + SYNC_SNAP_RESEND_MS) {
if (pBlk->acked || nowMs < pBlk->sendTimeMs + SYNC_SNAP_RESEND_MS) {
continue;
}
if (syncSnapSendMsg(pSender, pBlk->seq, pBlk->pBlock, pBlk->blockLen, 0) != 0) {

View File

@ -101,8 +101,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_APP_IS_STARTING, "Database is starting
TAOS_DEFINE_ERROR(TSDB_CODE_APP_IS_STOPPING, "Database is closing down")
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_DATA_FMT, "Invalid data format")
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_CFG_VALUE, "Invalid configuration value")
TAOS_DEFINE_ERROR(TSDB_CODE_IP_NOT_IN_WHITE_LIST, "Not allowed to connect")
TAOS_DEFINE_ERROR(TSDB_CODE_FAILED_TO_CONNECT_S3, "Failed to connect to s3 server")
TAOS_DEFINE_ERROR(TSDB_CODE_IP_NOT_IN_WHITE_LIST, "Not allowed to connect")
TAOS_DEFINE_ERROR(TSDB_CODE_FAILED_TO_CONNECT_S3, "Failed to connect to s3 server")
//client
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_OPERATION, "Invalid operation")

View File

@ -119,6 +119,8 @@ class TDTestCase(TBase):
# stop taosd test taos as server
sc.dnodeStop(idx)
etool.exeBinFile("taos", f'-n server', wait=False)
time.sleep(3)
eos.exe("pkill -9 taos")
# run
def run(self):

View File

@ -52,13 +52,14 @@ class TDTestCase(TBase):
tdLog.info(f"do query.")
# __group_key
sql = f"select count(*),_group_key(uti),uti from {self.stb} partition by uti;"
tdSql.execute(sql)
tdSql.checkRows(251)
sql = f"select count(*),_group_key(uti),uti from {self.stb} partition by uti"
tdSql.query(sql)
# column index 1 value same with 2
tdSql.checkSameColumn(1, 2)
sql = f"select count(*),_group_key(usi) from {self.stb} group by usi;"
tdSql.execute(sql)
tdSql.checkRows(997)
sql = f"select count(*),_group_key(usi),usi from {self.stb} group by usi limit 100;"
tdSql.query(sql)
tdSql.checkSameColumn(1, 2)
# tail
sql1 = "select ts,ui from d0 order by ts desc limit 5 offset 2;"

View File

@ -193,10 +193,10 @@ class TBase:
# sql
rows1 = tdSql.query(sql1,queryTimes=2)
res1 = copy.deepcopy(tdSql.queryResult)
res1 = copy.deepcopy(tdSql.res)
tdSql.query(sql2,queryTimes=2)
res2 = tdSql.queryResult
res2 = tdSql.res
rowlen1 = len(res1)
rowlen2 = len(res2)

View File

@ -795,7 +795,7 @@ class TDCom:
def getOneRow(self, location, containElm):
res_list = list()
if 0 <= location < tdSql.queryRows:
for row in tdSql.queryResult:
for row in tdSql.res:
if row[location] == containElm:
res_list.append(row)
return res_list
@ -943,7 +943,7 @@ class TDCom:
"""drop all streams
"""
tdSql.query("show streams")
stream_name_list = list(map(lambda x: x[0], tdSql.queryResult))
stream_name_list = list(map(lambda x: x[0], tdSql.res))
for stream_name in stream_name_list:
tdSql.execute(f'drop stream if exists {stream_name};')
@ -962,7 +962,7 @@ class TDCom:
"""drop all databases
"""
tdSql.query("show databases;")
db_list = list(map(lambda x: x[0], tdSql.queryResult))
db_list = list(map(lambda x: x[0], tdSql.res))
for dbname in db_list:
if dbname not in self.white_list and "telegraf" not in dbname:
tdSql.execute(f'drop database if exists `{dbname}`')
@ -1412,7 +1412,7 @@ class TDCom:
input_function (str): scalar
"""
tdSql.query(sql)
res = tdSql.queryResult
res = tdSql.res
if input_function in ["acos", "asin", "atan", "cos", "log", "pow", "sin", "sqrt", "tan"]:
tdSql.checkEqual(res[1][1], "DOUBLE")
tdSql.checkEqual(res[2][1], "DOUBLE")
@ -1490,7 +1490,7 @@ class TDCom:
bigint: bigint-ts
"""
tdSql.query(f'select cast({str_ts} as bigint)')
return tdSql.queryResult[0][0]
return tdSql.res[0][0]
def cast_query_data(self, query_data):
"""cast query-result for existed-stb
@ -1514,7 +1514,7 @@ class TDCom:
tdSql.query(f'select cast("{v}" as binary(6))')
else:
tdSql.query(f'select cast("{v}" as {" ".join(col_tag_type_list[i].strip().split(" ")[1:])})')
query_data_l[i] = tdSql.queryResult[0][0]
query_data_l[i] = tdSql.res[0][0]
else:
query_data_l[i] = v
nl.append(tuple(query_data_l))
@ -1566,9 +1566,9 @@ class TDCom:
if tag_value_list:
dvalue = len(self.tag_type_str.split(',')) - defined_tag_count
tdSql.query(sql1)
res1 = tdSql.queryResult
res1 = tdSql.res
tdSql.query(sql2)
res2 = self.cast_query_data(tdSql.queryResult) if tag_value_list or use_exist_stb else tdSql.queryResult
res2 = self.cast_query_data(tdSql.res) if tag_value_list or use_exist_stb else tdSql.res
tdSql.sql = sql1
new_list = list()
if tag_value_list:
@ -1601,10 +1601,10 @@ class TDCom:
tdLog.info("query retrying ...")
new_list = list()
tdSql.query(sql1)
res1 = tdSql.queryResult
res1 = tdSql.res
tdSql.query(sql2)
# res2 = tdSql.queryResult
res2 = self.cast_query_data(tdSql.queryResult) if tag_value_list or use_exist_stb else tdSql.queryResult
# res2 = tdSql.res
res2 = self.cast_query_data(tdSql.res) if tag_value_list or use_exist_stb else tdSql.res
tdSql.sql = sql1
if tag_value_list:
@ -1643,10 +1643,10 @@ class TDCom:
tdLog.info("query retrying ...")
new_list = list()
tdSql.query(sql1)
res1 = tdSql.queryResult
res1 = tdSql.res
tdSql.query(sql2)
# res2 = tdSql.queryResult
res2 = self.cast_query_data(tdSql.queryResult) if tag_value_list or use_exist_stb else tdSql.queryResult
# res2 = tdSql.res
res2 = self.cast_query_data(tdSql.res) if tag_value_list or use_exist_stb else tdSql.res
tdSql.sql = sql1
if tag_value_list:

View File

@ -86,10 +86,9 @@ class ConfigureyCluster:
count=0
while count < 5:
tdSql.query("select * from information_schema.ins_dnodes")
# tdLog.debug(tdSql.queryResult)
status=0
for i in range(self.dnodeNums):
if tdSql.queryResult[i][4] == "ready":
if tdSql.res[i][4] == "ready":
status+=1
# tdLog.debug(status)

View File

@ -78,6 +78,107 @@ class TDSql:
self.cursor.execute(s)
time.sleep(2)
#
# do execute
#
def query(self, sql, row_tag=None, queryTimes=10, count_expected_res=None):
self.sql = sql
i=1
while i <= queryTimes:
try:
self.cursor.execute(sql)
self.res = self.cursor.fetchall()
self.queryRows = len(self.res)
self.queryCols = len(self.cursor.description)
if count_expected_res is not None:
counter = 0
while count_expected_res != self.res[0][0]:
self.cursor.execute(sql)
self.res = self.cursor.fetchall()
if counter < queryTimes:
counter += 0.5
time.sleep(0.5)
else:
return False
if row_tag:
return self.res
return self.queryRows
except Exception as e:
tdLog.notice("Try to query again, query times: %d "%i)
if i == queryTimes:
caller = inspect.getframeinfo(inspect.stack()[1][0])
args = (caller.filename, caller.lineno, sql, repr(e))
tdLog.notice("%s(%d) failed: sql:%s, %s" % args)
raise Exception(repr(e))
i+=1
time.sleep(1)
pass
def executeTimes(self, sql, times):
for i in range(times):
try:
return self.cursor.execute(sql)
except BaseException:
time.sleep(1)
continue
def execute(self, sql, queryTimes=30, show=False):
self.sql = sql
if show:
tdLog.info(sql)
i=1
while i <= queryTimes:
try:
self.affectedRows = self.cursor.execute(sql)
return self.affectedRows
except Exception as e:
tdLog.notice("Try to execute sql again, query times: %d "%i)
if i == queryTimes:
caller = inspect.getframeinfo(inspect.stack()[1][0])
args = (caller.filename, caller.lineno, sql, repr(e))
tdLog.notice("%s(%d) failed: sql:%s, %s" % args)
raise Exception(repr(e))
i+=1
time.sleep(1)
pass
# execute many sql
def executes(self, sqls, queryTimes=30, show=False):
for sql in sqls:
self.execute(sql, queryTimes, show)
def waitedQuery(self, sql, expectRows, timeout):
tdLog.info("sql: %s, try to retrieve %d rows in %d seconds" % (sql, expectRows, timeout))
self.sql = sql
try:
for i in range(timeout):
self.cursor.execute(sql)
self.res = self.cursor.fetchall()
self.queryRows = len(self.res)
self.queryCols = len(self.cursor.description)
tdLog.info("sql: %s, try to retrieve %d rows,get %d rows" % (sql, expectRows, self.queryRows))
if self.queryRows >= expectRows:
return (self.queryRows, i)
time.sleep(1)
except Exception as e:
caller = inspect.getframeinfo(inspect.stack()[1][0])
args = (caller.filename, caller.lineno, sql, repr(e))
tdLog.notice("%s(%d) failed: sql:%s, %s" % args)
raise Exception(repr(e))
return (self.queryRows, timeout)
def is_err_sql(self, sql):
err_flag = True
try:
self.cursor.execute(sql)
except BaseException:
err_flag = False
return False if err_flag else True
def error(self, sql, expectedErrno = None, expectErrInfo = None):
caller = inspect.getframeinfo(inspect.stack()[1][0])
expectErrNotOccured = True
@ -95,7 +196,7 @@ class TDSql:
else:
self.queryRows = 0
self.queryCols = 0
self.queryResult = None
self.res = None
if expectedErrno != None:
if expectedErrno == self.errno:
@ -115,49 +216,25 @@ class TDSql:
return self.error_info
def query(self, sql, row_tag=None, queryTimes=10, count_expected_res=None):
#
# get session
#
def getData(self, row, col):
self.checkRowCol(row, col)
return self.res[row][col]
def getResult(self, sql):
self.sql = sql
i=1
while i <= queryTimes:
try:
self.cursor.execute(sql)
self.queryResult = self.cursor.fetchall()
self.queryRows = len(self.queryResult)
self.queryCols = len(self.cursor.description)
if count_expected_res is not None:
counter = 0
while count_expected_res != self.queryResult[0][0]:
self.cursor.execute(sql)
self.queryResult = self.cursor.fetchall()
if counter < queryTimes:
counter += 0.5
time.sleep(0.5)
else:
return False
if row_tag:
return self.queryResult
return self.queryRows
except Exception as e:
tdLog.notice("Try to query again, query times: %d "%i)
if i == queryTimes:
caller = inspect.getframeinfo(inspect.stack()[1][0])
args = (caller.filename, caller.lineno, sql, repr(e))
tdLog.notice("%s(%d) failed: sql:%s, %s" % args)
raise Exception(repr(e))
i+=1
time.sleep(1)
pass
def is_err_sql(self, sql):
err_flag = True
try:
self.cursor.execute(sql)
except BaseException:
err_flag = False
return False if err_flag else True
self.res = self.cursor.fetchall()
except Exception as e:
caller = inspect.getframeinfo(inspect.stack()[1][0])
args = (caller.filename, caller.lineno, sql, repr(e))
tdLog.notice("%s(%d) failed: sql:%s, %s" % args)
raise Exception(repr(e))
return self.res
def getVariable(self, search_attr):
'''
@ -193,29 +270,19 @@ class TDSql:
return col_name_list, col_type_list
return col_name_list
def waitedQuery(self, sql, expectRows, timeout):
tdLog.info("sql: %s, try to retrieve %d rows in %d seconds" % (sql, expectRows, timeout))
self.sql = sql
try:
for i in range(timeout):
self.cursor.execute(sql)
self.queryResult = self.cursor.fetchall()
self.queryRows = len(self.queryResult)
self.queryCols = len(self.cursor.description)
tdLog.info("sql: %s, try to retrieve %d rows,get %d rows" % (sql, expectRows, self.queryRows))
if self.queryRows >= expectRows:
return (self.queryRows, i)
time.sleep(1)
except Exception as e:
caller = inspect.getframeinfo(inspect.stack()[1][0])
args = (caller.filename, caller.lineno, sql, repr(e))
tdLog.notice("%s(%d) failed: sql:%s, %s" % args)
raise Exception(repr(e))
return (self.queryRows, timeout)
def getRows(self):
return self.queryRows
# get first value
def getFirstValue(self, sql) :
self.query(sql)
return self.getData(0, 0)
#
# check session
#
def checkRows(self, expectRows):
if self.queryRows == expectRows:
tdLog.info("sql:%s, queryRows:%d == expect:%d" % (self.sql, self.queryRows, expectRows))
@ -273,26 +340,26 @@ class TDSql:
self.checkRowCol(row, col)
if self.queryResult[row][col] != data:
if self.res[row][col] != data:
if self.cursor.istype(col, "TIMESTAMP"):
# suppose user want to check nanosecond timestamp if a longer data passed``
if isinstance(data,str) :
if (len(data) >= 28):
if self.queryResult[row][col] == _parse_ns_timestamp(data):
if self.res[row][col] == _parse_ns_timestamp(data):
if(show):
tdLog.info("check successfully")
else:
caller = inspect.getframeinfo(inspect.stack()[1][0])
args = (caller.filename, caller.lineno, self.sql, row, col, self.queryResult[row][col], data)
args = (caller.filename, caller.lineno, self.sql, row, col, self.res[row][col], data)
tdLog.exit("%s(%d) failed: sql:%s row:%d col:%d data:%s != expect:%s" % args)
else:
if self.queryResult[row][col].astimezone(datetime.timezone.utc) == _parse_datetime(data).astimezone(datetime.timezone.utc):
# tdLog.info(f"sql:{self.sql}, row:{row} col:{col} data:{self.queryResult[row][col]} == expect:{data}")
if self.res[row][col].astimezone(datetime.timezone.utc) == _parse_datetime(data).astimezone(datetime.timezone.utc):
# tdLog.info(f"sql:{self.sql}, row:{row} col:{col} data:{self.res[row][col]} == expect:{data}")
if(show):
tdLog.info("check successfully")
else:
caller = inspect.getframeinfo(inspect.stack()[1][0])
args = (caller.filename, caller.lineno, self.sql, row, col, self.queryResult[row][col], data)
args = (caller.filename, caller.lineno, self.sql, row, col, self.res[row][col], data)
tdLog.exit("%s(%d) failed: sql:%s row:%d col:%d data:%s != expect:%s" % args)
return
elif isinstance(data,int):
@ -304,72 +371,72 @@ class TDSql:
precision = 'ns'
else:
caller = inspect.getframeinfo(inspect.stack()[1][0])
args = (caller.filename, caller.lineno, self.sql, row, col, self.queryResult[row][col], data)
args = (caller.filename, caller.lineno, self.sql, row, col, self.res[row][col], data)
tdLog.exit("%s(%d) failed: sql:%s row:%d col:%d data:%s != expect:%s" % args)
return
success = False
if precision == 'ms':
dt_obj = self.queryResult[row][col]
dt_obj = self.res[row][col]
ts = int(int((dt_obj-datetime.datetime.fromtimestamp(0,dt_obj.tzinfo)).total_seconds())*1000) + int(dt_obj.microsecond/1000)
if ts == data:
success = True
elif precision == 'us':
dt_obj = self.queryResult[row][col]
dt_obj = self.res[row][col]
ts = int(int((dt_obj-datetime.datetime.fromtimestamp(0,dt_obj.tzinfo)).total_seconds())*1e6) + int(dt_obj.microsecond)
if ts == data:
success = True
elif precision == 'ns':
if data == self.queryResult[row][col]:
if data == self.res[row][col]:
success = True
if success:
if(show):
tdLog.info("check successfully")
else:
caller = inspect.getframeinfo(inspect.stack()[1][0])
args = (caller.filename, caller.lineno, self.sql, row, col, self.queryResult[row][col], data)
args = (caller.filename, caller.lineno, self.sql, row, col, self.res[row][col], data)
tdLog.exit("%s(%d) failed: sql:%s row:%d col:%d data:%s != expect:%s" % args)
return
elif isinstance(data,datetime.datetime):
dt_obj = self.queryResult[row][col]
dt_obj = self.res[row][col]
delt_data = data-datetime.datetime.fromtimestamp(0,data.tzinfo)
delt_result = self.queryResult[row][col] - datetime.datetime.fromtimestamp(0,self.queryResult[row][col].tzinfo)
delt_result = self.res[row][col] - datetime.datetime.fromtimestamp(0,self.res[row][col].tzinfo)
if delt_data == delt_result:
if(show):
tdLog.info("check successfully")
else:
caller = inspect.getframeinfo(inspect.stack()[1][0])
args = (caller.filename, caller.lineno, self.sql, row, col, self.queryResult[row][col], data)
args = (caller.filename, caller.lineno, self.sql, row, col, self.res[row][col], data)
tdLog.exit("%s(%d) failed: sql:%s row:%d col:%d data:%s != expect:%s" % args)
return
else:
caller = inspect.getframeinfo(inspect.stack()[1][0])
args = (caller.filename, caller.lineno, self.sql, row, col, self.queryResult[row][col], data)
args = (caller.filename, caller.lineno, self.sql, row, col, self.res[row][col], data)
tdLog.exit("%s(%d) failed: sql:%s row:%d col:%d data:%s != expect:%s" % args)
if str(self.queryResult[row][col]) == str(data):
# tdLog.info(f"sql:{self.sql}, row:{row} col:{col} data:{self.queryResult[row][col]} == expect:{data}")
if str(self.res[row][col]) == str(data):
# tdLog.info(f"sql:{self.sql}, row:{row} col:{col} data:{self.res[row][col]} == expect:{data}")
if(show):
tdLog.info("check successfully")
return
elif isinstance(data, float):
if abs(data) >= 1 and abs((self.queryResult[row][col] - data) / data) <= 0.000001:
# tdLog.info(f"sql:{self.sql}, row:{row} col:{col} data:{self.queryResult[row][col]} == expect:{data}")
if abs(data) >= 1 and abs((self.res[row][col] - data) / data) <= 0.000001:
# tdLog.info(f"sql:{self.sql}, row:{row} col:{col} data:{self.res[row][col]} == expect:{data}")
if(show):
tdLog.info("check successfully")
elif abs(data) < 1 and abs(self.queryResult[row][col] - data) <= 0.000001:
# tdLog.info(f"sql:{self.sql}, row:{row} col:{col} data:{self.queryResult[row][col]} == expect:{data}")
elif abs(data) < 1 and abs(self.res[row][col] - data) <= 0.000001:
# tdLog.info(f"sql:{self.sql}, row:{row} col:{col} data:{self.res[row][col]} == expect:{data}")
if(show):
tdLog.info("check successfully")
else:
caller = inspect.getframeinfo(inspect.stack()[1][0])
args = (caller.filename, caller.lineno, self.sql, row, col, self.queryResult[row][col], data)
args = (caller.filename, caller.lineno, self.sql, row, col, self.res[row][col], data)
tdLog.exit("%s(%d) failed: sql:%s row:%d col:%d data:%s != expect:%s" % args)
return
else:
caller = inspect.getframeinfo(inspect.stack()[1][0])
args = (caller.filename, caller.lineno, self.sql, row, col, self.queryResult[row][col], data)
args = (caller.filename, caller.lineno, self.sql, row, col, self.res[row][col], data)
tdLog.exit("%s(%d) failed: sql:%s row:%d col:%d data:%s != expect:%s" % args)
if(show):
tdLog.info("check successfully")
@ -397,23 +464,23 @@ class TDSql:
def checkDataNoExit(self, row, col, data):
if self.checkRowColNoExit(row, col) == False:
return False
if self.queryResult[row][col] != data:
if self.res[row][col] != data:
if self.cursor.istype(col, "TIMESTAMP"):
# suppose user want to check nanosecond timestamp if a longer data passed
if (len(data) >= 28):
if pd.to_datetime(self.queryResult[row][col]) == pd.to_datetime(data):
if pd.to_datetime(self.res[row][col]) == pd.to_datetime(data):
return True
else:
if self.queryResult[row][col] == _parse_datetime(data):
if self.res[row][col] == _parse_datetime(data):
return True
return False
if str(self.queryResult[row][col]) == str(data):
if str(self.res[row][col]) == str(data):
return True
elif isinstance(data, float):
if abs(data) >= 1 and abs((self.queryResult[row][col] - data) / data) <= 0.000001:
if abs(data) >= 1 and abs((self.res[row][col] - data) / data) <= 0.000001:
return True
elif abs(data) < 1 and abs(self.queryResult[row][col] - data) <= 0.000001:
elif abs(data) < 1 and abs(self.res[row][col] - data) <= 0.000001:
return True
else:
return False
@ -437,56 +504,6 @@ class TDSql:
self.query(sql)
self.checkData(row, col, data)
def getData(self, row, col):
self.checkRowCol(row, col)
return self.queryResult[row][col]
def getResult(self, sql):
self.sql = sql
try:
self.cursor.execute(sql)
self.queryResult = self.cursor.fetchall()
except Exception as e:
caller = inspect.getframeinfo(inspect.stack()[1][0])
args = (caller.filename, caller.lineno, sql, repr(e))
tdLog.notice("%s(%d) failed: sql:%s, %s" % args)
raise Exception(repr(e))
return self.queryResult
def executeTimes(self, sql, times):
for i in range(times):
try:
return self.cursor.execute(sql)
except BaseException:
time.sleep(1)
continue
def execute(self, sql, queryTimes=30, show=False):
self.sql = sql
if show:
tdLog.info(sql)
i=1
while i <= queryTimes:
try:
self.affectedRows = self.cursor.execute(sql)
return self.affectedRows
except Exception as e:
tdLog.notice("Try to execute sql again, query times: %d "%i)
if i == queryTimes:
caller = inspect.getframeinfo(inspect.stack()[1][0])
args = (caller.filename, caller.lineno, sql, repr(e))
tdLog.notice("%s(%d) failed: sql:%s, %s" % args)
raise Exception(repr(e))
i+=1
time.sleep(1)
pass
# execute many sql
def executes(self, sqls, queryTimes=30, show=False):
for sql in sqls:
self.execute(sql, queryTimes, show)
def checkAffectedRows(self, expectAffectedRows):
if self.affectedRows != expectAffectedRows:
caller = inspect.getframeinfo(inspect.stack()[1][0])
@ -544,17 +561,22 @@ class TDSql:
def checkAgg(self, sql, expectCnt):
self.query(sql)
self.checkData(0, 0, expectCnt)
# get first value
def getFirstValue(self, sql) :
self.query(sql)
return self.getData(0, 0)
# expect first value
def checkFirstValue(self, sql, expect):
self.query(sql)
self.checkData(0, 0, expect)
# colIdx1 value same with colIdx2
def checkSameColumn(self, c1, c2):
for i in range(self.queryRows):
if self.res[i][c1] != self.res[i][c2]:
tdLog.exit(f"Not same. row={i} col1={c1} col2={c2}. {self.res[i][c1]}!={self.res[i][c2]}")
tdLog.info(f"check {self.queryRows} rows two column value same. column index [{c1},{c2}]")
#
# others session
#
def get_times(self, time_str, precision="ms"):
caller = inspect.getframeinfo(inspect.stack()[1][0])

View File

@ -560,17 +560,6 @@ if __name__ == "__main__":
conn = taosws.connect(f"taosws://root:taosdata@{host}:6041")
else:
conn = taos.connect(host=f"{host}", config=tdDnodes.getSimCfgPath())
# tdSql.init(conn.cursor())
# tdSql.execute("create qnode on dnode 1")
# tdSql.execute('alter local "queryPolicy" "%d"'%queryPolicy)
# tdSql.query("show local variables;")
# for i in range(tdSql.queryRows):
# if tdSql.queryResult[i][0] == "queryPolicy" :
# if int(tdSql.queryResult[i][1]) == int(queryPolicy):
# tdLog.info('alter queryPolicy to %d successfully'%queryPolicy)
# else :
# tdLog.debug(tdSql.queryResult)
# tdLog.exit("alter queryPolicy to %d failed"%queryPolicy)
cursor = conn.cursor()
cursor.execute("create qnode on dnode 1")

View File

@ -3,7 +3,13 @@
#NA,NA,y or n,script,./test.sh -f tsim/user/basic.sim
#unit-test
archOs=$(arch)
if [[ $archOs =~ "aarch64" ]]; then
,,n,unit-test,bash test.sh
else
,,y,unit-test,bash test.sh
fi
#
# army-test
@ -11,6 +17,7 @@
,,y,army,./pytest.sh python3 ./test.py -f enterprise/multi-level/mlevel_basic.py -N 3 -L 3 -D 2
,,y,army,./pytest.sh python3 ./test.py -f enterprise/s3/s3_basic.py -L 3 -D 1
,,y,army,./pytest.sh python3 ./test.py -f community/cluster/snapshot.py -N 3 -L 3 -D 2
,,y,army,./pytest.sh python3 ./test.py -f community/query/query_basic.py -N 3
,,n,army,python3 ./test.py -f community/cmdline/fullopt.py
@ -36,6 +43,7 @@
#,,n,system-test,python3 ./test.py -f 8-stream/snode_restart.py -N 4
,,n,system-test,python3 ./test.py -f 8-stream/snode_restart_with_checkpoint.py -N 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/project_group.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/tbname_vgroup.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/count_interval.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/compact-col.py
@ -1244,7 +1252,9 @@ e
,,y,script,./test.sh -f tsim/sma/tsmaCreateInsertQuery.sim
,,y,script,./test.sh -f tsim/sma/rsmaCreateInsertQuery.sim
,,y,script,./test.sh -f tsim/sma/rsmaCreateInsertQueryDelete.sim
,,y,script,./test.sh -f tsim/sma/rsmaPersistenceRecovery.sim
### refactor stream backend, open case after rsma refactored
#,,y,script,./test.sh -f tsim/sma/rsmaPersistenceRecovery.sim
,,y,script,./test.sh -f tsim/sync/vnodesnapshot-rsma-test.sim
,,n,script,./test.sh -f tsim/valgrind/checkError1.sim
,,n,script,./test.sh -f tsim/valgrind/checkError2.sim

View File

@ -97,7 +97,24 @@ class TDSql:
i+=1
time.sleep(1)
pass
def no_error(self, sql):
caller = inspect.getframeinfo(inspect.stack()[1][0])
expectErrOccurred = False
try:
self.cursor.execute(sql)
except BaseException as e:
expectErrOccurred = True
self.errno = e.errno
error_info = repr(e)
self.error_info = ','.join(error_info[error_info.index('(') + 1:-1].split(",")[:-1]).replace("'", "")
if expectErrOccurred:
tdLog.exit("%s(%d) failed: sql:%s, unexpect error '%s' occurred" % (caller.filename, caller.lineno, sql, self.error_info))
else:
tdLog.info("sql:%s, check passed, no ErrInfo occurred" % (sql))
def error(self, sql, expectedErrno = None, expectErrInfo = None, fullMatched = True):
caller = inspect.getframeinfo(inspect.stack()[1][0])
expectErrNotOccured = True
@ -126,9 +143,9 @@ class TDSql:
if expectErrInfo != None:
if expectErrInfo == self.error_info:
tdLog.info("sql:%s, expected expectErrInfo '%s' occured" % (sql, expectErrInfo))
tdLog.info("sql:%s, expected ErrInfo '%s' occured" % (sql, expectErrInfo))
else:
tdLog.exit("%s(%d) failed: sql:%s, expectErrInfo '%s' occured, but not expected expectErrInfo '%s'" % (caller.filename, caller.lineno, sql, self.error_info, expectErrInfo))
tdLog.exit("%s(%d) failed: sql:%s, ErrInfo '%s' occured, but not expected ErrInfo '%s'" % (caller.filename, caller.lineno, sql, self.error_info, expectErrInfo))
else:
if expectedErrno != None:
if expectedErrno in self.errno:
@ -138,9 +155,9 @@ class TDSql:
if expectErrInfo != None:
if expectErrInfo in self.error_info:
tdLog.info("sql:%s, expected expectErrInfo '%s' occured" % (sql, expectErrInfo))
tdLog.info("sql:%s, expected ErrInfo '%s' occured" % (sql, expectErrInfo))
else:
tdLog.exit("%s(%d) failed: sql:%s, expectErrInfo %s occured, but not expected expectErrInfo '%s'" % (caller.filename, caller.lineno, sql, self.error_info, expectErrInfo))
tdLog.exit("%s(%d) failed: sql:%s, ErrInfo %s occured, but not expected ErrInfo '%s'" % (caller.filename, caller.lineno, sql, self.error_info, expectErrInfo))
return self.error_info

View File

@ -1505,7 +1505,7 @@ if $data10 != @21-05-05 18:19:21.000@ then
return -1
endi
sql select a.ts,b.ts,a.c1,b.u1,b.u2 from (select * from stb1) a, (select * from stb2) b where a.ts=b.ts and (a.c1 < 10 or a.c1 > 30) and (b.u1 < 5 or b.u1 > 5) order by ts;
sql select a.ts,b.ts,a.c1,b.u1,b.u2 from (select * from stb1) a, (select * from stb2) b where a.ts=b.ts and (a.c1 < 10 or a.c1 > 30) and (b.u1 < 5 or b.u1 > 5) order by a.ts;
if $rows != 4 then
return -1
endi
@ -1521,8 +1521,9 @@ endi
if $data30 != @21-05-05 18:19:14.000@ then
return -1
endi
sql_error select a.ts,b.ts,a.c1,b.u1,b.u2 from (select * from stb1) a, (select * from stb2) b where a.ts=b.ts and (a.c1 < 10 or a.c1 > 30) and (b.u1 < 5 or b.u1 > 5) order by ts;
sql select a.ts,b.ts,a.c1,b.u1,b.u2 from (select * from stb1) a, (select * from stb2) b where a.ts=b.ts and a.c1 < 30 and b.u1 > 1 and a.c1 > 10 and b.u1 < 8 and b.u1<>5 order by ts;
sql select a.ts,b.ts,a.c1,b.u1,b.u2 from (select * from stb1) a, (select * from stb2) b where a.ts=b.ts and a.c1 < 30 and b.u1 > 1 and a.c1 > 10 and b.u1 < 8 and b.u1<>5 order by a.ts;
if $rows != 3 then
return -1
endi
@ -1535,6 +1536,8 @@ endi
if $data20 != @21-05-05 18:19:10.000@ then
return -1
endi
sql_error select a.ts,b.ts,a.c1,b.u1,b.u2 from (select * from stb1) a, (select * from stb2) b where a.ts=b.ts and a.c1 < 30 and b.u1 > 1 and a.c1 > 10 and b.u1 < 8 and b.u1<>5 order by ts;
sql select a.ts,a.ts,a.c1,b.u1,b.u2 from (select * from stb1) a, (select * from stb2) b where a.ts=b.ts and a.c1 < 30 and b.u1 > 1 and a.c1 > 10 and b.u1 < 8 and b.u1<>5 order by ts;
sql select * from stb1 where c1 is null and c1 is not null;
if $rows != 0 then
@ -2469,7 +2472,7 @@ endi
if $data10 != @21-05-05 18:19:05.000@ then
return -1
endi
sql select tb1.ts,tb1.*,tb2_1.* from tb1, tb2_1 where tb1.ts=tb2_1.ts and tb1.ts > '2021-05-05 18:19:03.000' and tb2_1.u1 < 5 order by ts;
sql select tb1.ts,tb1.*,tb2_1.* from tb1, tb2_1 where tb1.ts=tb2_1.ts and tb1.ts > '2021-05-05 18:19:03.000' and tb2_1.u1 < 5 order by tb1.ts;
if $rows != 2 then
return -1
endi
@ -2480,7 +2483,7 @@ if $data10 != @21-05-05 18:19:06.000@ then
return -1
endi
sql select tb1.ts,tb1.*,tb2_1.* from tb1, tb2_1 where tb1.ts=tb2_1.ts and tb1.ts >= '2021-05-05 18:19:03.000' and tb1.c7=false and tb2_1.u3>4 order by ts;
sql select tb1.ts,tb1.*,tb2_1.* from tb1, tb2_1 where tb1.ts=tb2_1.ts and tb1.ts >= '2021-05-05 18:19:03.000' and tb1.c7=false and tb2_1.u3>4 order by tb1.ts;
if $rows != 2 then
return -1
endi
@ -2491,7 +2494,7 @@ if $data10 != @21-05-05 18:19:07.000@ then
return -1
endi
sql select stb1.ts,stb1.c1,stb1.t1,stb2.ts,stb2.u1,stb2.t4 from stb1, stb2 where stb1.ts=stb2.ts and stb1.t1 = stb2.t4 order by ts;
sql select stb1.ts,stb1.c1,stb1.t1,stb2.ts,stb2.u1,stb2.t4 from stb1, stb2 where stb1.ts=stb2.ts and stb1.t1 = stb2.t4 order by stb1.ts;
if $rows != 9 then
return -1
endi
@ -2523,7 +2526,7 @@ if $data80 != @21-05-05 18:19:11.000@ then
return -1
endi
sql select stb1.ts,stb1.c1,stb1.t1,stb2.ts,stb2.u1,stb2.t4 from stb1, stb2 where stb1.ts=stb2.ts and stb1.t1 = stb2.t4 and stb1.c1 > 2 and stb2.u1 <=4 order by ts;
sql select stb1.ts,stb1.c1,stb1.t1,stb2.ts,stb2.u1,stb2.t4 from stb1, stb2 where stb1.ts=stb2.ts and stb1.t1 = stb2.t4 and stb1.c1 > 2 and stb2.u1 <=4 order by stb1.ts;
if $rows != 3 then
return -1
endi

View File

@ -129,10 +129,10 @@ sql select a.ts,a.c1,a.c8 from (select * from stb1 where c7=true) a, (select * f
sql select * from stb1 where (c6 > 3.0 or c6 < 60) and c6 > 50 and (c6 != 53 or c6 != 63);;
sql select ts,c1 from stb1 where (c1 > 60 or c1 < 10 or (c1 > 20 and c1 < 30)) and ts > '2021-05-05 18:19:00.000' and ts < '2021-05-05 18:19:25.000' and c1 != 21 and c1 != 22 order by ts;
sql select a.* from (select * from stb1 where c7=true) a, (select * from stb1 where c1 > 30) b where a.ts=b.ts and a.c1 > 50 order by ts;;
sql select a.ts,b.ts,a.c1,b.u1,b.u2 from (select * from stb1) a, (select * from stb2) b where a.ts=b.ts and (a.c1 < 10 or a.c1 > 30) and (b.u1 < 5 or b.u1 > 5) order by ts;;
sql select a.ts,b.ts,a.c1,b.u1,b.u2 from (select * from stb1) a, (select * from stb2) b where a.ts=b.ts and a.c1 < 30 and b.u1 > 1 and a.c1 > 10 and b.u1 < 8 and b.u1<>5 order by ts;;
sql select tb1.ts,tb1.*,tb2_1.* from tb1, tb2_1 where tb1.ts=tb2_1.ts and tb1.ts >= '2021-05-05 18:19:03.000' and tb1.c7=false and tb2_1.u3>4 order by ts;;
sql select stb1.ts,stb1.c1,stb1.t1,stb2.ts,stb2.u1,stb2.t4 from stb1, stb2 where stb1.ts=stb2.ts and stb1.t1 = stb2.t4 order by ts;;
sql select a.ts,b.ts,a.c1,b.u1,b.u2 from (select * from stb1) a, (select * from stb2) b where a.ts=b.ts and (a.c1 < 10 or a.c1 > 30) and (b.u1 < 5 or b.u1 > 5) order by a.ts;;
sql select a.ts,b.ts,a.c1,b.u1,b.u2 from (select * from stb1) a, (select * from stb2) b where a.ts=b.ts and a.c1 < 30 and b.u1 > 1 and a.c1 > 10 and b.u1 < 8 and b.u1<>5 order by a.ts;;
sql select tb1.ts,tb1.*,tb2_1.* from tb1, tb2_1 where tb1.ts=tb2_1.ts and tb1.ts >= '2021-05-05 18:19:03.000' and tb1.c7=false and tb2_1.u3>4 order by tb1.ts;;
sql select stb1.ts,stb1.c1,stb1.t1,stb2.ts,stb2.u1,stb2.t4 from stb1, stb2 where stb1.ts=stb2.ts and stb1.t1 = stb2.t4 order by stb1.ts;;
sql select count(*) from stb1 where tbname like 'tb%' or c1 > 0;;
sql select * from stb1 where tbname like 'tb%' and (t1=1 or t2=2 or t3=3) and t1 > 2 order by ts;;

View File

@ -280,7 +280,9 @@
./test.sh -f tsim/sma/tsmaCreateInsertQuery.sim
./test.sh -f tsim/sma/rsmaCreateInsertQuery.sim
./test.sh -f tsim/sma/rsmaCreateInsertQueryDelete.sim
./test.sh -f tsim/sma/rsmaPersistenceRecovery.sim
### refactor stream backend, open case after rsma refactored
#./test.sh -f tsim/sma/rsmaPersistenceRecovery.sim
./test.sh -f tsim/sync/vnodesnapshot-rsma-test.sim
./test.sh -f tsim/valgrind/checkError1.sim
./test.sh -f tsim/valgrind/checkError2.sim

View File

@ -861,11 +861,56 @@ class TDTestCase:
self.support_super_table_test()
def initLastRowDelayTest(self, dbname="db"):
tdSql.execute(f"drop database if exists {dbname} ")
create_db_sql = f"create database if not exists {dbname} keep 3650 duration 1000 cachemodel 'NONE' REPLICA 1"
tdSql.execute(create_db_sql)
time.sleep(3)
tdSql.execute(f"use {dbname}")
tdSql.execute(f'create stable {dbname}.st(ts timestamp, v_int int, v_float float) TAGS (ctname varchar(32))')
tdSql.execute(f"create table {dbname}.ct1 using {dbname}.st tags('ct1')")
tdSql.execute(f"create table {dbname}.ct2 using {dbname}.st tags('ct2')")
tdSql.execute(f"insert into {dbname}.st(tbname,ts,v_float, v_int) values('ct1',1630000000000,86,86)")
tdSql.execute(f"insert into {dbname}.st(tbname,ts,v_float, v_int) values('ct1',1630000021255,59,59)")
tdSql.execute(f'flush database {dbname}')
tdSql.execute(f'select last(*) from {dbname}.st')
tdSql.execute(f'select last_row(*) from {dbname}.st')
tdSql.execute(f"insert into {dbname}.st(tbname,ts) values('ct1',1630000091255)")
tdSql.execute(f'flush database {dbname}')
tdSql.execute(f'select last(*) from {dbname}.st')
tdSql.execute(f'select last_row(*) from {dbname}.st')
tdSql.execute(f'alter database {dbname} cachemodel "both"')
tdSql.query(f'select last(*) from {dbname}.st')
tdSql.checkData(0 , 1 , 59)
tdSql.query(f'select last_row(*) from {dbname}.st')
tdSql.checkData(0 , 1 , None)
tdSql.checkData(0 , 2 , None)
tdLog.printNoPrefix("========== delay test init success ==============")
def lastRowDelayTest(self, dbname="db"):
tdLog.printNoPrefix("========== delay test start ==============")
tdSql.execute(f"use {dbname}")
tdSql.query(f'select last(*) from {dbname}.st')
tdSql.checkData(0 , 1 , 59)
tdSql.query(f'select last_row(*) from {dbname}.st')
tdSql.checkData(0 , 1 , None)
tdSql.checkData(0 , 2 , None)
def run(self): # sourcery skip: extract-duplicate-method, remove-redundant-fstring
# tdSql.prepare()
tdLog.printNoPrefix("==========step1:create table ==============")
self.initLastRowDelayTest("DELAYTEST")
# cache_last 0
self.prepare_datas("'NONE' ")
self.prepare_tag_datas("'NONE'")
@ -890,6 +935,8 @@ class TDTestCase:
self.insert_datas_and_check_abs(self.tb_nums,self.row_nums,self.time_step,"'BOTH'")
self.basic_query()
self.lastRowDelayTest("DELAYTEST")
def stop(self):
tdSql.close()

View File

@ -278,19 +278,19 @@ class TDTestCase:
def queryOrderByAgg(self):
tdSql.query("SELECT COUNT(*) FROM t1 order by COUNT(*)")
tdSql.no_error("SELECT COUNT(*) FROM t1 order by COUNT(*)")
tdSql.query("SELECT COUNT(*) FROM t1 order by last(c2)")
tdSql.no_error("SELECT COUNT(*) FROM t1 order by last(c2)")
tdSql.query("SELECT c1 FROM t1 order by last(ts)")
tdSql.no_error("SELECT c1 FROM t1 order by last(ts)")
tdSql.query("SELECT ts FROM t1 order by last(ts)")
tdSql.no_error("SELECT ts FROM t1 order by last(ts)")
tdSql.query("SELECT last(ts), ts, c1 FROM t1 order by 2")
tdSql.no_error("SELECT last(ts), ts, c1 FROM t1 order by 2")
tdSql.query("SELECT ts, last(ts) FROM t1 order by last(ts)")
tdSql.no_error("SELECT ts, last(ts) FROM t1 order by last(ts)")
tdSql.query(f"SELECT * FROM t1 order by last(ts)")
tdSql.no_error(f"SELECT * FROM t1 order by last(ts)")
tdSql.query(f"SELECT last(ts) as t2, ts FROM t1 order by 1")
tdSql.checkRows(1)
@ -302,6 +302,18 @@ class TDTestCase:
tdSql.error(f"SELECT last(ts) as t2, ts FROM t1 order by last(t2)")
def queryOrderByAmbiguousName(self):
tdSql.error(sql="select c1 as name, c2 as name, c3 from t1 order by name", expectErrInfo='ambiguous',
fullMatched=False)
tdSql.error(sql="select c1, c2 as c1, c3 from t1 order by c1", expectErrInfo='ambiguous', fullMatched=False)
tdSql.error(sql='select last(ts), last(c1) as name ,last(c2) as name,last(c3) from t1 order by name',
expectErrInfo='ambiguous', fullMatched=False)
tdSql.no_error("select c1 as name, c2 as c1, c3 from t1 order by c1")
tdSql.no_error('select c1 as name from (select c1, c2 as name from st) order by name')
# run
def run(self):
@ -317,6 +329,8 @@ class TDTestCase:
# agg
self.queryOrderByAgg()
# td-28332
self.queryOrderByAmbiguousName()
# stop
def stop(self):

View File

@ -0,0 +1,65 @@
from wsgiref.headers import tspecials
from util.log import *
from util.cases import *
from util.sql import *
import numpy as np
class TDTestCase:
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor())
self.rowNum = 10
self.batchNum = 5
self.ts = 1537146000000
def run(self):
dbname = "db"
tdSql.prepare()
tdSql.execute(f'''create table sta(ts timestamp, col1 int, col2 bigint) tags(tg1 int, tg2 binary(20))''')
tdSql.execute(f"create table sta1 using sta tags(1, 'a')")
tdSql.execute(f"create table sta2 using sta tags(2, 'b')")
tdSql.execute(f"create table sta3 using sta tags(3, 'c')")
tdSql.execute(f"create table sta4 using sta tags(4, 'a')")
tdSql.execute(f"insert into sta1 values(1537146000001, 11, 110)")
tdSql.execute(f"insert into sta1 values(1537146000002, 12, 120)")
tdSql.execute(f"insert into sta1 values(1537146000003, 13, 130)")
tdSql.execute(f"insert into sta2 values(1537146000001, 21, 210)")
tdSql.execute(f"insert into sta2 values(1537146000002, 22, 220)")
tdSql.execute(f"insert into sta2 values(1537146000003, 23, 230)")
tdSql.execute(f"insert into sta3 values(1537146000001, 31, 310)")
tdSql.execute(f"insert into sta3 values(1537146000002, 32, 320)")
tdSql.execute(f"insert into sta3 values(1537146000003, 33, 330)")
tdSql.execute(f"insert into sta4 values(1537146000001, 41, 410)")
tdSql.execute(f"insert into sta4 values(1537146000002, 42, 420)")
tdSql.execute(f"insert into sta4 values(1537146000003, 43, 430)")
tdSql.execute(f'''create table stb(ts timestamp, col1 int, col2 bigint) tags(tg1 int, tg2 binary(20))''')
tdSql.execute(f"create table stb1 using stb tags(1, 'a')")
tdSql.execute(f"create table stb2 using stb tags(2, 'b')")
tdSql.execute(f"create table stb3 using stb tags(3, 'c')")
tdSql.execute(f"create table stb4 using stb tags(4, 'a')")
tdSql.execute(f"insert into stb1 values(1537146000001, 911, 9110)")
tdSql.execute(f"insert into stb1 values(1537146000002, 912, 9120)")
tdSql.execute(f"insert into stb1 values(1537146000003, 913, 9130)")
tdSql.execute(f"insert into stb2 values(1537146000001, 921, 9210)")
tdSql.execute(f"insert into stb2 values(1537146000002, 922, 9220)")
tdSql.execute(f"insert into stb2 values(1537146000003, 923, 9230)")
tdSql.execute(f"insert into stb3 values(1537146000001, 931, 9310)")
tdSql.execute(f"insert into stb3 values(1537146000002, 932, 9320)")
tdSql.execute(f"insert into stb3 values(1537146000003, 933, 9330)")
tdSql.execute(f"insert into stb4 values(1537146000001, 941, 9410)")
tdSql.execute(f"insert into stb4 values(1537146000002, 942, 9420)")
tdSql.execute(f"insert into stb4 values(1537146000003, 943, 9430)")
tdSql.query("select * from (select ts, col1 from sta partition by tbname) limit 2");
tdSql.checkRows(2)
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())