Merge remote-tracking branch 'origin/3.0' into fix/mnode

This commit is contained in:
Shengliang Guan 2022-07-20 19:20:07 +08:00
commit 76eeeb8cac
29 changed files with 253 additions and 215 deletions

View File

@ -73,7 +73,6 @@ static FORCE_INLINE int64_t taosGetTimestampToday(int32_t precision) {
}
int64_t taosTimeAdd(int64_t t, int64_t duration, char unit, int32_t precision);
int64_t taosTimeSub(int64_t t, int64_t duration, char unit, int32_t precision);
int64_t taosTimeTruncate(int64_t t, const SInterval* pInterval, int32_t precision);
int32_t taosTimeCountInterval(int64_t skey, int64_t ekey, int64_t interval, char unit, int32_t precision);

View File

@ -33,16 +33,16 @@ extern "C" {
#define wTrace(...) { if (wDebugFlag & DEBUG_TRACE) { taosPrintLog("WAL ", DEBUG_TRACE, wDebugFlag, __VA_ARGS__); }}
// clang-format on
#define WAL_PROTO_VER 0
#define WAL_NOSUFFIX_LEN 20
#define WAL_SUFFIX_AT (WAL_NOSUFFIX_LEN + 1)
#define WAL_LOG_SUFFIX "log"
#define WAL_INDEX_SUFFIX "idx"
#define WAL_REFRESH_MS 1000
#define WAL_MAX_SIZE (TSDB_MAX_WAL_SIZE + sizeof(SWalCkHead))
#define WAL_PATH_LEN (TSDB_FILENAME_LEN + 12)
#define WAL_FILE_LEN (WAL_PATH_LEN + 32)
#define WAL_MAGIC 0xFAFBFCFDULL
#define WAL_PROTO_VER 0
#define WAL_NOSUFFIX_LEN 20
#define WAL_SUFFIX_AT (WAL_NOSUFFIX_LEN + 1)
#define WAL_LOG_SUFFIX "log"
#define WAL_INDEX_SUFFIX "idx"
#define WAL_REFRESH_MS 1000
#define WAL_PATH_LEN (TSDB_FILENAME_LEN + 12)
#define WAL_FILE_LEN (WAL_PATH_LEN + 32)
#define WAL_MAGIC 0xFAFBFCFDULL
#define WAL_SCAN_BUF_SIZE (1024 * 1024 * 3)
typedef enum {
TAOS_WAL_WRITE = 1,

View File

@ -421,7 +421,7 @@ typedef enum ELogicConditionType {
#define TSDB_DEFAULT_STABLES_HASH_SIZE 100
#define TSDB_DEFAULT_CTABLES_HASH_SIZE 20000
#define TSDB_MAX_WAL_SIZE (1024 * 1024 * 3)
#define TSDB_MAX_MSG_SIZE (1024 * 1024 * 10)
#define TSDB_ARB_DUMMY_TIME 4765104000000 // 2121-01-01 00:00:00.000, :P

View File

@ -45,7 +45,6 @@ void taosIp2String(uint32_t ip, char *str);
void taosIpPort2String(uint32_t ip, uint16_t port, char *str);
void *tmemmem(const char *haystack, int hlen, const char *needle, int nlen);
char *strDupUnquo(const char *src);
static FORCE_INLINE void taosEncryptPass(uint8_t *inBuf, size_t inLen, char *target) {
T_MD5_CTX context;

View File

@ -40,11 +40,11 @@ bool tsPrintAuth = false;
// multi process
int32_t tsMultiProcess = 0;
int32_t tsMnodeShmSize = TSDB_MAX_WAL_SIZE * 2 + 1024;
int32_t tsVnodeShmSize = TSDB_MAX_WAL_SIZE * 10 + 1024;
int32_t tsQnodeShmSize = TSDB_MAX_WAL_SIZE * 4 + 1024;
int32_t tsSnodeShmSize = TSDB_MAX_WAL_SIZE * 4 + 1024;
int32_t tsBnodeShmSize = TSDB_MAX_WAL_SIZE * 4 + 1024;
int32_t tsMnodeShmSize = TSDB_MAX_MSG_SIZE * 2 + 1024;
int32_t tsVnodeShmSize = TSDB_MAX_MSG_SIZE * 10 + 1024;
int32_t tsQnodeShmSize = TSDB_MAX_MSG_SIZE * 4 + 1024;
int32_t tsSnodeShmSize = TSDB_MAX_MSG_SIZE * 4 + 1024;
int32_t tsBnodeShmSize = TSDB_MAX_MSG_SIZE * 4 + 1024;
int32_t tsNumOfShmThreads = 1;
// queue & threads
@ -387,11 +387,11 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddBool(pCfg, "deadLockKillQuery", tsDeadLockKillQuery, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "multiProcess", tsMultiProcess, 0, 2, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "mnodeShmSize", tsMnodeShmSize, TSDB_MAX_WAL_SIZE * 2 + 1024, INT32_MAX, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "vnodeShmSize", tsVnodeShmSize, TSDB_MAX_WAL_SIZE * 2 + 1024, INT32_MAX, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "qnodeShmSize", tsQnodeShmSize, TSDB_MAX_WAL_SIZE * 2 + 1024, INT32_MAX, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "snodeShmSize", tsSnodeShmSize, TSDB_MAX_WAL_SIZE * 2 + 1024, INT32_MAX, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "bnodeShmSize", tsBnodeShmSize, TSDB_MAX_WAL_SIZE * 2 + 1024, INT32_MAX, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "mnodeShmSize", tsMnodeShmSize, TSDB_MAX_MSG_SIZE * 2 + 1024, INT32_MAX, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "vnodeShmSize", tsVnodeShmSize, TSDB_MAX_MSG_SIZE * 2 + 1024, INT32_MAX, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "qnodeShmSize", tsQnodeShmSize, TSDB_MAX_MSG_SIZE * 2 + 1024, INT32_MAX, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "snodeShmSize", tsSnodeShmSize, TSDB_MAX_MSG_SIZE * 2 + 1024, INT32_MAX, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "bnodeShmSize", tsBnodeShmSize, TSDB_MAX_MSG_SIZE * 2 + 1024, INT32_MAX, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "mumOfShmThreads", tsNumOfShmThreads, 1, 1024, 0) != 0) return -1;
tsNumOfRpcThreads = tsNumOfCores / 2;
@ -447,8 +447,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddInt32(pCfg, "numOfSnodeUniqueThreads", tsNumOfSnodeUniqueThreads, 1, 1024, 0) != 0) return -1;
tsRpcQueueMemoryAllowed = tsTotalMemoryKB * 1024 * 0.1;
tsRpcQueueMemoryAllowed = TRANGE(tsRpcQueueMemoryAllowed, TSDB_MAX_WAL_SIZE * 10L, TSDB_MAX_WAL_SIZE * 10000L);
if (cfgAddInt64(pCfg, "rpcQueueMemoryAllowed", tsRpcQueueMemoryAllowed, TSDB_MAX_WAL_SIZE * 10L, INT64_MAX, 0) != 0)
tsRpcQueueMemoryAllowed = TRANGE(tsRpcQueueMemoryAllowed, TSDB_MAX_MSG_SIZE * 10L, TSDB_MAX_MSG_SIZE * 10000L);
if (cfgAddInt64(pCfg, "rpcQueueMemoryAllowed", tsRpcQueueMemoryAllowed, TSDB_MAX_MSG_SIZE * 10L, INT64_MAX, 0) != 0)
return -1;
if (cfgAddBool(pCfg, "monitor", tsEnableMonitor, 0) != 0) return -1;

View File

@ -5349,6 +5349,7 @@ int32_t tEncodeSVAlterTbReq(SEncoder *pEncoder, const SVAlterTbReq *pReq) {
if (tEncodeCStr(pEncoder, pReq->tbName) < 0) return -1;
if (tEncodeI8(pEncoder, pReq->action) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->colId) < 0) return -1;
switch (pReq->action) {
case TSDB_ALTER_TABLE_ADD_COLUMN:
if (tEncodeCStr(pEncoder, pReq->colName) < 0) return -1;
@ -5399,6 +5400,7 @@ int32_t tDecodeSVAlterTbReq(SDecoder *pDecoder, SVAlterTbReq *pReq) {
if (tDecodeCStr(pDecoder, &pReq->tbName) < 0) return -1;
if (tDecodeI8(pDecoder, &pReq->action) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->colId) < 0) return -1;
switch (pReq->action) {
case TSDB_ALTER_TABLE_ADD_COLUMN:
if (tDecodeCStr(pDecoder, &pReq->colName) < 0) return -1;

View File

@ -700,6 +700,8 @@ int64_t taosTimeAdd(int64_t t, int64_t duration, char unit, int32_t precision) {
numOfMonth *= 12;
}
int64_t fraction = t % TSDB_TICK_PER_SECOND(precision);
struct tm tm;
time_t tt = (time_t)(t / TSDB_TICK_PER_SECOND(precision));
taosLocalTime(&tt, &tm);
@ -707,35 +709,9 @@ int64_t taosTimeAdd(int64_t t, int64_t duration, char unit, int32_t precision) {
tm.tm_year = mon / 12;
tm.tm_mon = mon % 12;
return (int64_t)(taosMktime(&tm) * TSDB_TICK_PER_SECOND(precision));
return (int64_t)(taosMktime(&tm) * TSDB_TICK_PER_SECOND(precision) + fraction);
}
int64_t taosTimeSub(int64_t t, int64_t duration, char unit, int32_t precision) {
if (duration == 0) {
return t;
}
if (unit != 'n' && unit != 'y') {
return t - duration;
}
// The following code handles the y/n time duration
int64_t numOfMonth = duration;
if (unit == 'y') {
numOfMonth *= 12;
}
struct tm tm;
time_t tt = (time_t)(t / TSDB_TICK_PER_SECOND(precision));
taosLocalTime(&tt, &tm);
int32_t mon = tm.tm_year * 12 + tm.tm_mon - (int32_t)numOfMonth;
tm.tm_year = mon / 12;
tm.tm_mon = mon % 12;
return (int64_t)(taosMktime(&tm) * TSDB_TICK_PER_SECOND(precision));
}
int32_t taosTimeCountInterval(int64_t skey, int64_t ekey, int64_t interval, char unit, int32_t precision) {
if (ekey < skey) {
int64_t tmp = ekey;
@ -844,11 +820,14 @@ int64_t taosTimeTruncate(int64_t t, const SInterval* pInterval, int32_t precisio
} else {
// try to move current window to the left-hande-side, due to the offset effect.
int64_t end = taosTimeAdd(start, pInterval->interval, pInterval->intervalUnit, precision) - 1;
ASSERT(end >= t);
end = taosTimeAdd(end, -pInterval->sliding, pInterval->slidingUnit, precision);
if (end >= t) {
start = taosTimeAdd(start, -pInterval->sliding, pInterval->slidingUnit, precision);
int64_t newEnd = end;
while(newEnd >= t) {
end = newEnd;
newEnd = taosTimeAdd(newEnd, -pInterval->sliding, pInterval->slidingUnit, precision);
}
start = taosTimeAdd(end, -pInterval->interval, pInterval->intervalUnit, precision) + 1;
}
}

View File

@ -231,7 +231,7 @@ static int32_t sdbReadFileImp(SSdb *pSdb) {
snprintf(file, sizeof(file), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP);
mDebug("start to read sdb file:%s", file);
SSdbRaw *pRaw = taosMemoryMalloc(WAL_MAX_SIZE + 100);
SSdbRaw *pRaw = taosMemoryMalloc(TSDB_MAX_MSG_SIZE + 100);
if (pRaw == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
mError("failed read sdb file since %s", terrstr());
@ -556,8 +556,9 @@ int32_t sdbStartRead(SSdb *pSdb, SSdbIter **ppIter, int64_t *index, int64_t *ter
if (term != NULL) *term = commitTerm;
if (config != NULL) *config = commitConfig;
mDebug("sdbiter:%p, is created to read snapshot, commit index:%" PRId64 " term:%" PRId64 " config:%" PRId64 " file:%s",
pIter, commitIndex, commitTerm, commitConfig, pIter->name);
mDebug("sdbiter:%p, is created to read snapshot, commit index:%" PRId64 " term:%" PRId64 " config:%" PRId64
" file:%s",
pIter, commitIndex, commitTerm, commitConfig, pIter->name);
return 0;
}
@ -669,4 +670,4 @@ int32_t sdbDoWrite(SSdb *pSdb, SSdbIter *pIter, void *pBuf, int32_t len) {
pIter->total += writelen;
mDebug("sdbiter:%p, write:%d bytes to snapshot, total:%" PRId64, pIter, writelen, pIter->total);
return 0;
}
}

View File

@ -506,7 +506,8 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
.initTqReader = true,
.version = ver,
};
pHandle->execHandle.execCol.task[i] = qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, &pHandle->execHandle.numOfCols);
pHandle->execHandle.execCol.task[i] =
qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, &pHandle->execHandle.numOfCols);
ASSERT(pHandle->execHandle.execCol.task[i]);
void* scanner = NULL;
qExtractStreamScanner(pHandle->execHandle.execCol.task[i], &scanner);
@ -679,9 +680,9 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
//
SStreamTaskRunReq* pReq = pMsg->pCont;
int32_t taskId = pReq->taskId;
SStreamTask* pTask = *(SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
if (pTask) {
streamProcessRunReq(pTask);
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
if (ppTask) {
streamProcessRunReq(*ppTask);
return 0;
} else {
return -1;
@ -696,14 +697,14 @@ int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg) {
SDecoder decoder;
tDecoderInit(&decoder, msgBody, msgLen);
tDecodeStreamDispatchReq(&decoder, &req);
int32_t taskId = req.taskId;
SStreamTask* pTask = *(SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
if (pTask) {
int32_t taskId = req.taskId;
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
if (ppTask) {
SRpcMsg rsp = {
.info = pMsg->info,
.code = 0,
};
streamProcessDispatchReq(pTask, &req, &rsp);
streamProcessDispatchReq(*ppTask, &req, &rsp);
return 0;
} else {
return -1;
@ -713,9 +714,9 @@ int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg) {
int32_t tqProcessTaskRecoverReq(STQ* pTq, SRpcMsg* pMsg) {
SStreamTaskRecoverReq* pReq = pMsg->pCont;
int32_t taskId = pReq->taskId;
SStreamTask* pTask = *(SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
if (pTask) {
streamProcessRecoverReq(pTask, pReq, pMsg);
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
if (ppTask) {
streamProcessRecoverReq(*ppTask, pReq, pMsg);
return 0;
} else {
return -1;
@ -725,9 +726,9 @@ int32_t tqProcessTaskRecoverReq(STQ* pTq, SRpcMsg* pMsg) {
int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
SStreamDispatchRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t taskId = pRsp->taskId;
SStreamTask* pTask = *(SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
if (pTask) {
streamProcessDispatchRsp(pTask, pRsp);
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
if (ppTask) {
streamProcessDispatchRsp(*ppTask, pRsp);
return 0;
} else {
return -1;
@ -737,9 +738,9 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
int32_t tqProcessTaskRecoverRsp(STQ* pTq, SRpcMsg* pMsg) {
SStreamTaskRecoverRsp* pRsp = pMsg->pCont;
int32_t taskId = pRsp->taskId;
SStreamTask* pTask = *(SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
if (pTask) {
streamProcessRecoverRsp(pTask, pRsp);
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
if (ppTask) {
streamProcessRecoverRsp(*ppTask, pRsp);
return 0;
} else {
return -1;
@ -749,10 +750,10 @@ int32_t tqProcessTaskRecoverRsp(STQ* pTq, SRpcMsg* pMsg) {
int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen) {
SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg;
SStreamTask* pTask = *(SStreamTask**)taosHashGet(pTq->pStreamTasks, &pReq->taskId, sizeof(int32_t));
if (pTask) {
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pTq->pStreamTasks, &pReq->taskId, sizeof(int32_t));
if (ppTask) {
taosHashRemove(pTq->pStreamTasks, &pReq->taskId, sizeof(int32_t));
atomic_store_8(&pTask->taskStatus, TASK_STATUS__DROPPING);
atomic_store_8(&(*ppTask)->taskStatus, TASK_STATUS__DROPPING);
}
// todo
// clear queue
@ -780,16 +781,17 @@ int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) {
SDecoder decoder;
tDecoderInit(&decoder, msgBody, msgLen);
tDecodeStreamRetrieveReq(&decoder, &req);
int32_t taskId = req.dstTaskId;
SStreamTask* pTask = *(SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
if (atomic_load_8(&pTask->taskStatus) != TASK_STATUS__NORMAL) {
return 0;
int32_t taskId = req.dstTaskId;
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
if (ppTask) {
SRpcMsg rsp = {
.info = pMsg->info,
.code = 0,
};
streamProcessRetrieveReq(*ppTask, &req, &rsp);
} else {
return -1;
}
SRpcMsg rsp = {
.info = pMsg->info,
.code = 0,
};
streamProcessRetrieveReq(pTask, &req, &rsp);
return 0;
}

View File

@ -2572,14 +2572,41 @@ int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pSc
return TSDB_CODE_SUCCESS;
}
static tb_uid_t getTableSuidByUid(tb_uid_t uid, STsdb* pTsdb) {
tb_uid_t suid = 0;
SMetaReader mr = {0};
metaReaderInit(&mr, pTsdb->pVnode->pMeta, 0);
if (metaGetTableEntryByUid(&mr, uid) < 0) {
metaReaderClear(&mr); // table not esist
return 0;
}
if (mr.me.type == TSDB_CHILD_TABLE) {
suid = mr.me.ctbEntry.suid;
} else if (mr.me.type == TSDB_NORMAL_TABLE) {
suid = 0;
} else {
suid = 0;
}
metaReaderClear(&mr);
return suid;
}
void updateSchema(TSDBROW* pRow, uint64_t uid, STsdbReader* pReader) {
int32_t sversion = TSDBROW_SVERSION(pRow);
tb_uid_t suid = getTableSuidByUid(uid, pReader->pTsdb);
if (pReader->pSchema == NULL) {
pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, uid, sversion);
// pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, uid, sversion);
metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, suid, uid, sversion, &pReader->pSchema);
} else if (pReader->pSchema->version != sversion) {
taosMemoryFreeClear(pReader->pSchema);
pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, uid, sversion);
// pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, uid, sversion);
metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, suid, uid, sversion, &pReader->pSchema);
}
}

View File

@ -744,8 +744,8 @@ typedef struct SSortOperatorInfo {
int64_t startTs; // sort start time
uint64_t sortElapsed; // sort elapsed time, time to flush to disk not included.
SNode* pCondition;
SLimitInfo limitInfo;
SNode* pCondition;
} SSortOperatorInfo;
typedef struct STagFilterOperatorInfo {
@ -785,7 +785,7 @@ int32_t initExprSupp(SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfExpr);
void cleanupExprSupp(SExprSupp* pSup);
int32_t initAggInfo(SExprSupp *pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize,
const char* pkey);
void initResultSizeInfo(SOperatorInfo* pOperator, int32_t numOfRows);
void initResultSizeInfo(SResultInfo * pResultInfo, int32_t numOfRows);
void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo, SDiskbasedBuf* pBuf);
int32_t handleLimitOffset(SOperatorInfo *pOperator, SLimitInfo* pLimitInfo, SSDataBlock* pBlock, bool holdDataInBuf);
bool hasLimitOffsetInfo(SLimitInfo* pLimitInfo);
@ -797,7 +797,7 @@ void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWin
int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadInfo, int32_t numOfRows, char* pData,
int32_t compLen, int32_t numOfOutput, int64_t startTs, uint64_t* total,
SArray* pColList);
void getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key, STimeWindow* win);
STimeWindow getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key);
STimeWindow getFirstQualifiedTimeWindow(int64_t ts, STimeWindow* pWindow, SInterval* pInterval, int32_t order);
int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t *order, int32_t* scanFlag);

View File

@ -50,7 +50,7 @@ SOperatorInfo* createLastrowScanOperator(SLastRowScanPhysiNode* pScanNode, SRead
STableListInfo* pTableList = &pTaskInfo->tableqinfoList;
initResultSizeInfo(pOperator, 1024);
initResultSizeInfo(&pOperator->resultInfo, 1024);
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
pInfo->pUidList = taosArrayInit(4, sizeof(int64_t));

View File

@ -824,10 +824,10 @@ int32_t convertFillType(int32_t mode) {
static void getInitialStartTimeWindow(SInterval* pInterval, TSKEY ts, STimeWindow* w, bool ascQuery) {
if (ascQuery) {
getAlignQueryTimeWindow(pInterval, pInterval->precision, ts, w);
*w = getAlignQueryTimeWindow(pInterval, pInterval->precision, ts);
} else {
// the start position of the first time window in the endpoint that spreads beyond the queried last timestamp
getAlignQueryTimeWindow(pInterval, pInterval->precision, ts, w);
*w = getAlignQueryTimeWindow(pInterval, pInterval->precision, ts);
int64_t key = w->skey;
while (key < ts) { // moving towards end

View File

@ -834,18 +834,20 @@ bool isTaskKilled(SExecTaskInfo* pTaskInfo) {
void setTaskKilled(SExecTaskInfo* pTaskInfo) { pTaskInfo->code = TSDB_CODE_TSC_QUERY_CANCELLED; }
/////////////////////////////////////////////////////////////////////////////////////////////
// todo refactor : return window
void getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key, STimeWindow* win) {
win->skey = taosTimeTruncate(key, pInterval, precision);
STimeWindow getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key) {
STimeWindow win = {0};
win.skey = taosTimeTruncate(key, pInterval, precision);
/*
* if the realSkey > INT64_MAX - pInterval->interval, the query duration between
* realSkey and realEkey must be less than one interval.Therefore, no need to adjust the query ranges.
*/
win->ekey = taosTimeAdd(win->skey, pInterval->interval, pInterval->intervalUnit, precision) - 1;
if (win->ekey < win->skey) {
win->ekey = INT64_MAX;
win.ekey = taosTimeAdd(win.skey, pInterval->interval, pInterval->intervalUnit, precision) - 1;
if (win.ekey < win.skey) {
win.ekey = INT64_MAX;
}
return win;
}
#if 0
@ -3603,13 +3605,13 @@ int32_t initAggInfo(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInf
return TSDB_CODE_SUCCESS;
}
void initResultSizeInfo(SOperatorInfo* pOperator, int32_t numOfRows) {
void initResultSizeInfo(SResultInfo * pResultInfo, int32_t numOfRows) {
ASSERT(numOfRows != 0);
pOperator->resultInfo.capacity = numOfRows;
pOperator->resultInfo.threshold = numOfRows * 0.75;
pResultInfo->capacity = numOfRows;
pResultInfo->threshold = numOfRows * 0.75;
if (pOperator->resultInfo.threshold == 0) {
pOperator->resultInfo.threshold = numOfRows;
if (pResultInfo->threshold == 0) {
pResultInfo->threshold = numOfRows;
}
}
@ -3672,7 +3674,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo*
int32_t numOfRows = 1024;
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
initResultSizeInfo(pOperator, numOfRows);
initResultSizeInfo(&pOperator->resultInfo, numOfRows);
int32_t code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
@ -3827,7 +3829,7 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys
if (numOfRows * pResBlock->info.rowSize > TWOMB) {
numOfRows = TWOMB / pResBlock->info.rowSize;
}
initResultSizeInfo(pOperator, numOfRows);
initResultSizeInfo(&pOperator->resultInfo, numOfRows);
initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str);
initBasicInfo(&pInfo->binfo, pResBlock);
@ -4005,7 +4007,7 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy
numOfRows = TWOMB / pResBlock->info.rowSize;
}
initResultSizeInfo(pOperator, numOfRows);
initResultSizeInfo(&pOperator->resultInfo, numOfRows);
initAggInfo(pSup, &pInfo->aggSup, pExprInfo, numOfExpr, keyBufSize, pTaskInfo->id.str);
initBasicInfo(&pInfo->binfo, pResBlock);
@ -4044,8 +4046,7 @@ static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t
STimeWindow win, int32_t capacity, const char* id, SInterval* pInterval, int32_t fillType) {
SFillColInfo* pColInfo = createFillColInfo(pExpr, numOfCols, pValNode);
STimeWindow w = TSWINDOW_INITIALIZER;
getAlignQueryTimeWindow(pInterval, pInterval->precision, win.skey, &w);
STimeWindow w = getAlignQueryTimeWindow(pInterval, pInterval->precision, win.skey);
w = getFirstQualifiedTimeWindow(win.skey, &w, pInterval, TSDB_ORDER_ASC);
int32_t order = TSDB_ORDER_ASC;
@ -4082,7 +4083,7 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode*
int32_t type = convertFillType(pPhyFillNode->mode);
SResultInfo* pResultInfo = &pOperator->resultInfo;
initResultSizeInfo(pOperator, 4096);
initResultSizeInfo(&pOperator->resultInfo, 4096);
pInfo->primaryTsCol = ((SColumnNode*)pPhyFillNode->pWStartTs)->slotId;
int32_t numOfOutputCols = 0;

View File

@ -406,7 +406,7 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx
goto _error;
}
initResultSizeInfo(pOperator, 4096);
initResultSizeInfo(&pOperator->resultInfo, 4096);
initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, pInfo->groupKeyLen, pTaskInfo->id.str);
initBasicInfo(&pInfo->binfo, pResultBlock);
initResultRowInfo(&pInfo->binfo.resultRowInfo);

View File

@ -41,7 +41,7 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t
int32_t numOfCols = 0;
SExprInfo* pExprInfo = createExprInfo(pJoinNode->pTargets, NULL, &numOfCols);
initResultSizeInfo(pOperator, 4096);
initResultSizeInfo(&pOperator->resultInfo, 4096);
pInfo->pRes = pResBlock;
pOperator->name = "MergeJoinOperator";

View File

@ -125,7 +125,7 @@ static bool overlapWithTimeWindow(SInterval* pInterval, SDataBlockInfo* pBlockIn
}
if (order == TSDB_ORDER_ASC) {
getAlignQueryTimeWindow(pInterval, pInterval->precision, pBlockInfo->window.skey, &w);
w = getAlignQueryTimeWindow(pInterval, pInterval->precision, pBlockInfo->window.skey);
assert(w.ekey >= pBlockInfo->window.skey);
if (w.ekey < pBlockInfo->window.ekey) {
@ -144,7 +144,7 @@ static bool overlapWithTimeWindow(SInterval* pInterval, SDataBlockInfo* pBlockIn
}
}
} else {
getAlignQueryTimeWindow(pInterval, pInterval->precision, pBlockInfo->window.ekey, &w);
w = getAlignQueryTimeWindow(pInterval, pInterval->precision, pBlockInfo->window.ekey);
assert(w.skey <= pBlockInfo->window.ekey);
if (w.skey > pBlockInfo->window.skey) {
@ -2324,7 +2324,7 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScan
pInfo->pCondition = pScanNode->node.pConditions;
pInfo->scanCols = colList;
initResultSizeInfo(pOperator, 4096);
initResultSizeInfo(&pOperator->resultInfo, 4096);
tNameAssign(&pInfo->name, &pScanNode->tableName);
const char* name = tNameGetTableName(&pInfo->name);
@ -2554,7 +2554,7 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi
pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo;
initResultSizeInfo(pOperator, 4096);
initResultSizeInfo(&pOperator->resultInfo, 4096);
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
pOperator->fpSet =
@ -3099,7 +3099,7 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
pOperator->info = pInfo;
pOperator->exprSupp.numOfExprs = numOfCols;
pOperator->pTaskInfo = pTaskInfo;
initResultSizeInfo(pOperator, 1024);
initResultSizeInfo(&pOperator->resultInfo, 1024);
pOperator->fpSet =
createOperatorFpSet(operatorDummyOpenFn, doTableMergeScan, NULL, NULL, destroyTableMergeScanOperatorInfo, NULL,

View File

@ -26,7 +26,7 @@ static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput);
SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortNode, SExecTaskInfo* pTaskInfo) {
SSortOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL /* || rowSize > 100 * 1024 * 1024*/) {
if (pInfo == NULL || pOperator == NULL) {
goto _error;
}
@ -41,13 +41,15 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode*
extractColMatchInfo(pSortNode->pTargets, pDescNode, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID);
pOperator->exprSupp.pCtx = createSqlFunctionCtx(pExprInfo, numOfCols, &pOperator->exprSupp.rowEntryInfoOffset);
initResultSizeInfo(&pOperator->resultInfo, 1024);
pInfo->binfo.pRes = pResBlock;
initResultSizeInfo(pOperator, 1024);
pInfo->pSortInfo = createSortInfo(pSortNode->pSortKeys);
pInfo->pSortInfo = createSortInfo(pSortNode->pSortKeys);
pInfo->pCondition = pSortNode->node.pConditions;
pInfo->pColMatchInfo = pColMatchColInfo;
initLimitInfo(pSortNode->node.pLimit, pSortNode->node.pSlimit, &pInfo->limitInfo);
pOperator->name = "SortOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SORT;
pOperator->blocking = true;
@ -208,26 +210,44 @@ SSDataBlock* doSort(SOperatorInfo* pOperator) {
SSDataBlock* pBlock = NULL;
while (1) {
pBlock = getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity,
pInfo->pColMatchInfo, pInfo);
if (pBlock != NULL) {
doFilter(pInfo->pCondition, pBlock);
}
pInfo->pColMatchInfo, pInfo);
if (pBlock == NULL) {
doSetOperatorCompleted(pOperator);
break;
return NULL;
}
if (blockDataGetNumOfRows(pBlock) > 0) {
doFilter(pInfo->pCondition, pBlock);
if (blockDataGetNumOfRows(pBlock) == 0) {
continue;
}
// todo add the limit/offset info
if (pInfo->limitInfo.remainOffset > 0) {
if (pInfo->limitInfo.remainOffset >= blockDataGetNumOfRows(pBlock)) {
pInfo->limitInfo.remainOffset -= pBlock->info.rows;
continue;
}
blockDataTrimFirstNRows(pBlock, pInfo->limitInfo.remainOffset);
pInfo->limitInfo.remainOffset = 0;
}
if (pInfo->limitInfo.limit.limit > 0 &&
pInfo->limitInfo.limit.limit <= pInfo->limitInfo.numOfOutputRows + blockDataGetNumOfRows(pBlock)) {
int32_t remain = pInfo->limitInfo.limit.limit - pInfo->limitInfo.numOfOutputRows;
blockDataKeepFirstNRows(pBlock, remain);
}
size_t numOfRows = blockDataGetNumOfRows(pBlock);
pInfo->limitInfo.numOfOutputRows += numOfRows;
pOperator->resultInfo.totalRows += numOfRows;
if (numOfRows > 0) {
break;
}
}
if (pBlock != NULL) {
pOperator->resultInfo.totalRows += pBlock->info.rows;
}
return pBlock;
return blockDataGetNumOfRows(pBlock) > 0? pBlock:NULL;
}
void destroyOrderOperatorInfo(void* param, int32_t numOfOutput) {
@ -479,7 +499,7 @@ SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSort
pOperator->exprSupp.pCtx = createSqlFunctionCtx(pExprInfo, numOfCols, &pOperator->exprSupp.rowEntryInfoOffset);
pInfo->binfo.pRes = pResBlock;
initResultSizeInfo(pOperator, 1024);
initResultSizeInfo(&pOperator->resultInfo, 1024);
pInfo->pSortInfo = createSortInfo(pSortPhyNode->pSortKeys);
;
@ -711,7 +731,7 @@ SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size
extractColMatchInfo(pMergePhyNode->pTargets, pDescNode, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID);
SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, 0);
SSDataBlock* pInputBlock = createResDataBlock(pChildNode->pOutputDataBlockDesc);
initResultSizeInfo(pOperator, 1024);
initResultSizeInfo(&pOperator->resultInfo, 1024);
pInfo->groupSort = pMergePhyNode->groupSort;
pInfo->binfo.pRes = pResBlock;

View File

@ -652,7 +652,7 @@ static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t num
void printDataBlock(SSDataBlock* pBlock, const char* flag) {
if (!pBlock || pBlock->info.rows == 0) {
qDebug("======printDataBlock: Block is Null or Empty");
qDebug("===stream===printDataBlock: Block is Null or Empty");
return;
}
char* pBuf = NULL;
@ -772,12 +772,6 @@ int32_t binarySearch(void* keyList, int num, TSKEY key, int order, __get_value_f
return midPos;
}
int64_t getReskey(void* data, int32_t index) {
SArray* res = (SArray*)data;
SResKeyPos* pos = taosArrayGetP(res, index);
return *(int64_t*)pos->key;
}
int32_t compareResKey(void* pKey, void* data, int32_t index) {
SArray* res = (SArray*)data;
SResKeyPos* pos = taosArrayGetP(res, index);
@ -1537,8 +1531,10 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
if (pInfo->binfo.pRes->info.rows == 0 || !hasDataInGroupInfo(&pInfo->groupResInfo)) {
pOperator->status = OP_EXEC_DONE;
qDebug("===stream===single interval is done");
freeAllPages(pInfo->pRecycledPages, pInfo->aggSup.pResultBuf);
}
printDataBlock(pInfo->binfo.pRes, "single interval");
return pInfo->binfo.pRes->info.rows == 0 ? NULL : pInfo->binfo.pRes;
}
@ -1772,7 +1768,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
SExprSupp* pSup = &pOperator->exprSupp;
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
initResultSizeInfo(pOperator, 4096);
initResultSizeInfo(&pOperator->resultInfo, 4096);
int32_t code = initAggInfo(pSup, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str);
initBasicInfo(&pInfo->binfo, pResBlock);
@ -1853,7 +1849,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExpr
int32_t numOfRows = 4096;
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
initResultSizeInfo(pOperator, numOfRows);
initResultSizeInfo(&pOperator->resultInfo, numOfRows);
int32_t code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str);
initBasicInfo(&pInfo->binfo, pResBlock);
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pInfo->win);
@ -2313,7 +2309,7 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode
pInfo->tsCol = extractColumnFromColumnNode((SColumnNode*)pInterpPhyNode->pTimeSeries);
pInfo->fillType = convertFillType(pInterpPhyNode->fillMode);
initResultSizeInfo(pOperator, 4096);
initResultSizeInfo(&pOperator->resultInfo, 4096);
pInfo->pFillColInfo = createFillColInfo(pExprInfo, numOfExprs, (SNodeListNode*)pInterpPhyNode->pFillValues);
pInfo->pRes = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
@ -2361,7 +2357,7 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInf
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
initResultSizeInfo(pOperator, 4096);
initResultSizeInfo(&pOperator->resultInfo, 4096);
initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExpr, numOfCols, keyBufSize, pTaskInfo->id.str);
initBasicInfo(&pInfo->binfo, pResBlock);
@ -2409,7 +2405,7 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo
}
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
initResultSizeInfo(pOperator, 4096);
initResultSizeInfo(&pOperator->resultInfo, 4096);
int32_t code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str);
if (code != TSDB_CODE_SUCCESS) {
@ -2985,7 +2981,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
ASSERT(pInfo->twAggSup.calTrigger != STREAM_TRIGGER_MAX_DELAY);
pInfo->primaryTsIndex = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
initResultSizeInfo(pOperator, 4096);
initResultSizeInfo(&pOperator->resultInfo, 4096);
if (pIntervalPhyNode->window.pExprs != NULL) {
int32_t numOfScalar = 0;
SExprInfo* pScalarExprInfo = createExprInfo(pIntervalPhyNode->window.pExprs, NULL, &numOfScalar);
@ -3161,7 +3157,7 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
goto _error;
}
initResultSizeInfo(pOperator, 4096);
initResultSizeInfo(&pOperator->resultInfo, 4096);
if (pSessionNode->window.pExprs != NULL) {
int32_t numOfScalar = 0;
SExprInfo* pScalarExprInfo = createExprInfo(pSessionNode->window.pExprs, NULL, &numOfScalar);
@ -4425,7 +4421,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
SExprInfo* pExprInfo = createExprInfo(pStateNode->window.pFuncs, NULL, &numOfCols);
pInfo->stateCol = extractColumnFromColumnNode(pColNode);
initResultSizeInfo(pOperator, 4096);
initResultSizeInfo(&pOperator->resultInfo, 4096);
if (pStateNode->window.pExprs != NULL) {
int32_t numOfScalar = 0;
SExprInfo* pScalarExprInfo = createExprInfo(pStateNode->window.pExprs, NULL, &numOfScalar);
@ -4675,7 +4671,7 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream,
iaInfo->primaryTsIndex = primaryTsSlotId;
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
initResultSizeInfo(pOperator, 4096);
initResultSizeInfo(&pOperator->resultInfo, 4096);
int32_t code =
initAggInfo(&pOperator->exprSupp, &iaInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str);
@ -4981,7 +4977,7 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SExprI
SExprSupp* pExprSupp = &pOperator->exprSupp;
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
initResultSizeInfo(pOperator, 4096);
initResultSizeInfo(&pOperator->resultInfo, 4096);
int32_t code = initAggInfo(pExprSupp, &iaInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str);
initBasicInfo(&iaInfo->binfo, pResBlock);

View File

@ -958,6 +958,7 @@ static bool validateHistogramBinDesc(char* binDescStr, int8_t binType, char* err
return false;
}
cJSON_Delete(binDesc);
taosMemoryFree(intervals);
return true;
}

View File

@ -2750,6 +2750,7 @@ static bool getHistogramBinDesc(SHistoFuncBin** bins, int32_t* binNum, char* bin
(*bins)[i].count = 0;
}
cJSON_Delete(binDesc);
taosMemoryFree(intervals);
return true;
}

View File

@ -1195,7 +1195,7 @@ static void vectorMathTsSubHelper(SColumnInfoData* pLeftCol, SColumnInfoData* pR
colDataAppendNULL(pOutputCol, i);
continue; // TODO set null or ignore
}
*output = taosTimeSub(getVectorBigintValueFnLeft(pLeftCol->pData, i), getVectorBigintValueFnRight(pRightCol->pData, 0),
*output = taosTimeAdd(getVectorBigintValueFnLeft(pLeftCol->pData, i), -getVectorBigintValueFnRight(pRightCol->pData, 0),
pRightCol->info.scale, pRightCol->info.precision);
}

View File

@ -40,7 +40,6 @@ static FORCE_INLINE int walBuildMetaName(SWal* pWal, int metaVer, char* buf) {
}
static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal) {
ASSERT(pWal->fileInfoSet != NULL);
int32_t sz = taosArrayGetSize(pWal->fileInfoSet);
ASSERT(sz > 0);
#if 0
@ -55,7 +54,7 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal) {
int64_t fileSize = 0;
taosStatFile(fnameStr, &fileSize, NULL);
int readSize = TMIN(WAL_MAX_SIZE + 2, fileSize);
int32_t readSize = TMIN(WAL_SCAN_BUF_SIZE, fileSize);
pLastFileInfo->fileSize = fileSize;
TdFilePtr pFile = taosOpenFile(fnameStr, TD_FILE_READ);
@ -73,7 +72,8 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal) {
return -1;
}
taosLSeekFile(pFile, -readSize, SEEK_END);
int64_t offset;
offset = taosLSeekFile(pFile, -readSize, SEEK_END);
if (readSize != taosReadFile(pFile, buf, readSize)) {
taosMemoryFree(buf);
taosCloseFile(&pFile);
@ -81,31 +81,56 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal) {
return -1;
}
char* haystack = buf;
char* found = NULL;
char* candidate;
while ((candidate = tmemmem(haystack, readSize - (haystack - buf), (char*)&magic, sizeof(uint64_t))) != NULL) {
// read and validate
SWalCkHead* logContent = (SWalCkHead*)candidate;
if (walValidHeadCksum(logContent) == 0 && walValidBodyCksum(logContent) == 0) {
found = candidate;
while (1) {
char* haystack = buf;
char* candidate;
while ((candidate = tmemmem(haystack, readSize - (haystack - buf), (char*)&magic, sizeof(uint64_t))) != NULL) {
// read and validate
SWalCkHead* logContent = (SWalCkHead*)candidate;
if (walValidHeadCksum(logContent) == 0 && walValidBodyCksum(logContent) == 0) {
found = candidate;
}
haystack = candidate + 1;
}
haystack = candidate + 1;
}
if (found == buf) {
SWalCkHead* logContent = (SWalCkHead*)found;
if (walValidHeadCksum(logContent) != 0 || walValidBodyCksum(logContent) != 0) {
// file has to be deleted
if (found || offset == 0) break;
offset = TMIN(0, offset - readSize + sizeof(uint64_t));
int64_t offset2 = taosLSeekFile(pFile, offset, SEEK_SET);
ASSERT(offset == offset2);
if (readSize != taosReadFile(pFile, buf, readSize)) {
taosMemoryFree(buf);
taosCloseFile(&pFile);
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
#if 0
if (found == buf) {
SWalCkHead* logContent = (SWalCkHead*)found;
if (walValidHeadCksum(logContent) != 0 || walValidBodyCksum(logContent) != 0) {
// file has to be deleted
taosMemoryFree(buf);
taosCloseFile(&pFile);
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
return -1;
}
}
#endif
}
taosCloseFile(&pFile);
SWalCkHead* lastEntry = (SWalCkHead*)found;
// TODO truncate file
return lastEntry->head.version;
if (found == NULL) {
// file corrupted, no complete log
// TODO delete and search in previous files
ASSERT(0);
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
return -1;
}
SWalCkHead* lastEntry = (SWalCkHead*)found;
int64_t retVer = lastEntry->head.version;
taosCloseFile(&pFile);
taosMemoryFree(buf);
return retVer;
}
int walCheckAndRepairMeta(SWal* pWal) {

View File

@ -436,11 +436,6 @@ END:
}
int64_t walAppendLog(SWal *pWal, tmsg_t msgType, SWalSyncInfo syncMeta, const void *body, int32_t bodyLen) {
if (bodyLen > TSDB_MAX_WAL_SIZE) {
terrno = TSDB_CODE_WAL_SIZE_LIMIT;
return -1;
}
taosThreadMutexLock(&pWal->mutex);
int64_t index = pWal->vers.lastVer + 1;
@ -472,10 +467,6 @@ int32_t walWriteWithSyncInfo(SWal *pWal, int64_t index, tmsg_t msgType, SWalSync
int32_t bodyLen) {
int32_t code = 0;
if (bodyLen > TSDB_MAX_WAL_SIZE) {
terrno = TSDB_CODE_WAL_SIZE_LIMIT;
return -1;
}
taosThreadMutexLock(&pWal->mutex);
// concurrency control:

View File

@ -64,20 +64,6 @@ int32_t strdequote(char *z) {
return j + 1; // only one quote, do nothing
}
char *strDupUnquo(const char *src) {
if (src == NULL) return NULL;
if (src[0] != '`') return strdup(src);
int32_t len = (int32_t)strlen(src);
if (src[len - 1] != '`') return NULL;
char *ret = taosMemoryMalloc(len);
if (ret == NULL) return NULL;
for (int32_t i = 0; i < len - 1; i++) {
ret[i] = src[i + 1];
}
ret[len - 1] = 0;
return ret;
}
size_t strtrim(char *z) {
int32_t i = 0;
int32_t j = 0;

View File

@ -185,6 +185,7 @@ print ===> rows1: $data10 $data11 $data12 $data13 $data14
print ===> rows2: $data20 $data21 $data22 $data23 $data24
print ===> rows3: $data30 $data31 $data32 $data33 $data34
if $rows != 4 then
print expect 4, actual: $rows
return -1
endi
if $data00 != @21-12-08 00:00:00.000@ then

View File

@ -43,7 +43,8 @@ sql select cast(1 as timestamp)+1n;
if $rows != 1 then
return -1
endi
if $data00 != @70-02-01 08:00:00.000@ then
if $data00 != @70-02-01 08:00:00.001@ then
print expect 70-02-01 08:00:00.001, actual: $data00
return -1
endi
@ -52,11 +53,13 @@ if $rows != 1 then
return -1
endi
sql select cast(1 as timestamp)-1y;
# there is an *bug* in print timestamp that smaller than 0, so let's try value that is greater than 0.
sql select cast(1 as timestamp)+1y;
if $rows != 1 then
return -1
endi
if $data00 != @69-01-01 08:00:00.000@ then
if $data00 != @71-01-01 08:00:00.001@ then
print expect 71-01-01 08:00:00.001 , actual: $data00
return -1
endi

View File

@ -14,6 +14,7 @@ class TDTestCase:
clientCfgDict["debugFlag"] = 131
updatecfgDict = {'clientCfg': {}}
updatecfgDict = {'debugFlag': 131}
updatecfgDict = {'keepColumnName': 1}
updatecfgDict["clientCfg"] = clientCfgDict
def init(self, conn, logSql):
@ -42,7 +43,7 @@ class TDTestCase:
tdSql.execute(f"create table dct{i} using diagnostics (name,fleet,driver,model,device_version) tags ('truck_{i}','South{i}','Trish{i}',NULL ,'v2.3')")
else:
tdSql.execute(f"create table dct{i} using diagnostics (name,fleet,driver,model,device_version) tags ('truck_{i}','South{i}','Trish{i}','H-{i}','v2.3')")
for j in range(10):
for j in range(10):
for i in range(100):
tdSql.execute(
f"insert into rct{j} values ( {ts+i*60000}, {80+i}, {90+i}, {85+i}, {30+i*10}, {1.2*i}, {221+i*2}, {20+i*0.2}, {1500+i*20}, {150+i*2},{5+i} )"
@ -92,23 +93,23 @@ class TDTestCase:
# test partition interval limit (PRcore-TD-17410)
# tdSql.query("select name,driver from (SELECT name,driver,fleet ,avg(velocity) as mean_velocity FROM readings partition BY name,driver,fleet interval (10m) limit 1);")
# tdSql.checkRows(10)
tdSql.query("select name,driver from (SELECT name,driver,fleet ,avg(velocity) as mean_velocity FROM readings partition BY name,driver,fleet interval (10m) limit 1);")
tdSql.checkRows(10)
# test partition interval Pseudo time-column
tdSql.query("SELECT count(ms1)/144 FROM (SELECT _wstart as ts1,model, fleet,avg(status) AS ms1 FROM diagnostics WHERE ts >= '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' partition by model, fleet interval(10m)) WHERE ts1 >= '2016-01-01T00:00:00Z' AND ts1 < '2016-01-05T00:00:01Z' AND ms1<1;")
# 1 high-load:
# tdSql.query("SELECT ts,name,driver,current_load,load_capacity FROM (SELECT last(ts) as ts,name,driver, current_load,load_capacity FROM diagnostics WHERE fleet = 'South' partition by name,driver) WHERE current_load>= (0.9 * load_capacity) partition by name ORDER BY name desc, ts DESC;")
tdSql.query("SELECT ts,name,driver,current_load,load_capacity FROM (SELECT last(ts) as ts,name,driver, current_load,load_capacity FROM diagnostics WHERE fleet = 'South' partition by name,driver) WHERE current_load>= (0.9 * load_capacity) partition by name ORDER BY name desc, ts DESC;")
# tdSql.query("SELECT ts,name,driver,current_load,load_capacity FROM (SELECT last(ts) as ts,name,driver, current_load,load_capacity FROM diagnostics WHERE fleet = 'South' partition by name,driver) WHERE current_load>= (0.9 * load_capacity) partition by name ORDER BY name ;")
tdSql.query("SELECT ts,name,driver,current_load,load_capacity FROM (SELECT last(ts) as ts,name,driver, current_load,load_capacity FROM diagnostics WHERE fleet = 'South' partition by name,driver) WHERE current_load>= (0.9 * load_capacity) partition by name ORDER BY name ;")
# 2 stationary-trucks
tdSql.query("select name,driver from (SELECT name,driver,fleet ,avg(velocity) as mean_velocity FROM readings WHERE ts > '2016-01-01T15:07:21Z' AND ts <= '2016-01-01T16:17:21Z' partition BY name,driver,fleet interval(10m) LIMIT 1)")
tdSql.query("select name,driver from (SELECT name,driver,fleet ,avg(velocity) as mean_velocity FROM readings WHERE ts > '2016-01-01T15:07:21Z' AND ts <= '2016-01-01T16:17:21Z' partition BY name,driver,fleet interval(10m) LIMIT 1) WHERE fleet = 'West' AND mean_velocity < 1000 partition BY name")
# 3 long-driving-sessions
# tdSql.query("SELECT name,driver FROM(SELECT name,driver,count(*) AS ten_min FROM(SELECT _wstart as ts,name,driver,avg(velocity) as mean_velocity FROM readings where ts > '2016-01-01T00:00:34Z' AND ts <= '2016-01-01T04:00:34Z' partition BY name,driver interval(10m)) WHERE mean_velocity > 1 GROUP BY name,driver) WHERE ten_min > 22 ;")
tdSql.query("SELECT name,driver FROM(SELECT name,driver,count(*) AS ten_min FROM(SELECT _wstart as ts,name,driver,avg(velocity) as mean_velocity FROM readings where ts > '2016-01-01T00:00:34Z' AND ts <= '2016-01-01T04:00:34Z' partition BY name,driver interval(10m)) WHERE mean_velocity > 1 GROUP BY name,driver) WHERE ten_min > 22 ;")
#4 long-daily-sessions
@ -130,15 +131,18 @@ class TDTestCase:
# 8. daily-activity
tdSql.query(" SELECT model,ms1 FROM (SELECT _wstart as ts1,model, fleet,avg(status) AS ms1 FROM diagnostics WHERE ts >= '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' partition by model, fleet interval(10m) fill(value,0)) WHERE ts1 >= '2016-01-01T00:00:00Z' AND ts1 < '2016-01-05T00:00:01Z' AND ms1<1;")
tdSql.query(" SELECT model,ms1 FROM (SELECT _wstart as ts1,model, fleet,avg(status) AS ms1 FROM diagnostics WHERE ts >= '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' partition by model, fleet interval(10m) ) WHERE ts1 >= '2016-01-01T00:00:00Z' AND ts1 < '2016-01-05T00:00:01Z' AND ms1<1;")
tdSql.query("SELECT _wstart,model,fleet,count(ms1)/144 FROM (SELECT _wstart as ts1,model, fleet,avg(status) AS ms1 FROM diagnostics WHERE ts >= '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' partition by model, fleet interval(10m) fill(value,0)) WHERE ts1 >= '2016-01-01T00:00:00Z' AND ts1 < '2016-01-05T00:00:01Z' AND ms1<1 partition by model, fleet interval(1d) ;")
# 9. breakdown-frequency
# NULL ---count(NULL)=0 expect count(NULL)= 100
tdSql.query("select tbname,count(model),model from readings partition by tbname,model;")
# model=NULL count(other) is 0
tdSql.query("select tbname,count(name),model from readings where model=NULL partition by tbname,model;")
tdSql.query("SELECT model,state_changed,count(state_changed) FROM (SELECT model,diff(broken_down) AS state_changed FROM (SELECT _wstart,model,cast(cast(floor(2*(sum(nzs)/count(nzs))) as bool) as int) AS broken_down FROM (SELECT ts,model, cast(cast(status as bool) as int) AS nzs FROM diagnostics WHERE ts >= '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' ) WHERE ts >= '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' partition BY model interval(10m)) partition BY model) where model is null partition BY model,state_changed ")
tdSql.query(" SELECT model,state_changed,count(state_changed) FROM (SELECT model,diff(broken_down) AS state_changed FROM (SELECT _wstart,model,cast(cast(floor(2*(sum(nzs)/count(nzs))) as bool) as int) AS broken_down FROM (SELECT ts,model, cast(cast(status as bool) as int) AS nzs FROM diagnostics WHERE ts >= '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' ) WHERE ts >= '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' partition BY model interval(10m)) partition BY model) where state_changed =1 partition BY model,state_changed ;")
#it's already supported:
# last-loc
tdSql.query("SELECT last_row(ts),latitude,longitude,name,driver FROM readings WHERE fleet='South' and name IS NOT NULL partition BY name,driver order by name ;")

@ -1 +1 @@
Subproject commit 2b75339b8b5c239619d1f09970d03075c58140dd
Subproject commit f84cb6e51556d8030585128c2b252aa2a6453328