Merge branch '3.0' of https://github.com/taosdata/TDengine into enh/hz_temp_3.0

This commit is contained in:
Hongze Cheng 2024-09-29 09:14:09 +08:00
commit 179be4365b
17 changed files with 244 additions and 151 deletions

View File

@ -7,7 +7,17 @@ ADD_CUSTOM_COMMAND(OUTPUT ${PREPARE_ENV_CMD}
COMMAND ${CMAKE_COMMAND} -E make_directory ${TD_TESTS_OUTPUT_DIR}/cfg/
COMMAND ${CMAKE_COMMAND} -E make_directory ${TD_TESTS_OUTPUT_DIR}/log/
COMMAND ${CMAKE_COMMAND} -E make_directory ${TD_TESTS_OUTPUT_DIR}/data/
COMMAND ${CMAKE_COMMAND} -E echo dataDir ${TD_TESTS_OUTPUT_DIR}/data > ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg
COMMAND ${CMAKE_COMMAND} -E echo firstEp localhost:6030 > ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg
COMMAND ${CMAKE_COMMAND} -E echo fqdn localhost >> ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg
COMMAND ${CMAKE_COMMAND} -E echo serverPort 6030 >> ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg
COMMAND ${CMAKE_COMMAND} -E echo debugFlag 135 >> ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg
COMMAND ${CMAKE_COMMAND} -E echo asyncLog 0 >> ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg
COMMAND ${CMAKE_COMMAND} -E echo supportVnodes 1024 >> ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg
COMMAND ${CMAKE_COMMAND} -E echo numOfLogLines 300000000 >> ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg
COMMAND ${CMAKE_COMMAND} -E echo logKeepDays -1 >> ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg
COMMAND ${CMAKE_COMMAND} -E echo checkpointInterval 60 >> ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg
COMMAND ${CMAKE_COMMAND} -E echo snodeAddress 127.0.0.1:873 >> ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg
COMMAND ${CMAKE_COMMAND} -E echo dataDir ${TD_TESTS_OUTPUT_DIR}/data >> ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg
COMMAND ${CMAKE_COMMAND} -E echo logDir ${TD_TESTS_OUTPUT_DIR}/log >> ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg
COMMAND ${CMAKE_COMMAND} -E echo charset UTF-8 >> ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg
COMMAND ${CMAKE_COMMAND} -E echo monitor 0 >> ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg

View File

@ -297,11 +297,22 @@ static void *vmOpenVnodeInThread(void *param) {
SVnodeMgmt *pMgmt = pThread->pMgmt;
char path[TSDB_FILENAME_LEN];
dInfo("thread:%d, start to open %d vnodes", pThread->threadIndex, pThread->vnodeNum);
dInfo("thread:%d, start to open or destroy %d vnodes", pThread->threadIndex, pThread->vnodeNum);
setThreadName("open-vnodes");
for (int32_t v = 0; v < pThread->vnodeNum; ++v) {
SWrapperCfg *pCfg = &pThread->pCfgs[v];
if (pCfg->dropped) {
char stepDesc[TSDB_STEP_DESC_LEN] = {0};
snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to destroy, %d of %d have been dropped", pCfg->vgId,
pMgmt->state.openVnodes, pMgmt->state.totalVnodes);
tmsgReportStartup("vnode-destroy", stepDesc);
snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, pCfg->vgId);
vnodeDestroy(pCfg->vgId, path, pMgmt->pTfs, 0);
pThread->updateVnodesList = true;
continue;
}
char stepDesc[TSDB_STEP_DESC_LEN] = {0};
snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to restore, %d of %d have been opened", pCfg->vgId,

View File

@ -1012,10 +1012,10 @@ _OVER:
int32_t mndAddStbToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
mndTransSetDbName(pTrans, pDb->name, pStb->name);
TAOS_CHECK_RETURN (mndTransCheckConflict(pMnode, pTrans));
TAOS_CHECK_RETURN (mndSetCreateStbCommitLogs(pMnode, pTrans, pDb, pStb));
TAOS_CHECK_RETURN (mndSetCreateStbRedoActions(pMnode, pTrans, pDb, pStb));
TAOS_CHECK_RETURN (mndSetCreateStbUndoActions(pMnode, pTrans, pDb, pStb));
TAOS_CHECK_RETURN(mndTransCheckConflict(pMnode, pTrans));
TAOS_CHECK_RETURN(mndSetCreateStbCommitLogs(pMnode, pTrans, pDb, pStb));
TAOS_CHECK_RETURN(mndSetCreateStbRedoActions(pMnode, pTrans, pDb, pStb));
TAOS_CHECK_RETURN(mndSetCreateStbUndoActions(pMnode, pTrans, pDb, pStb));
return 0;
}
@ -1051,7 +1051,7 @@ static int32_t mndProcessTtlTimer(SRpcMsg *pReq) {
SRpcMsg rpcMsg = {
.msgType = TDMT_VND_FETCH_TTL_EXPIRED_TBS, .pCont = pHead, .contLen = contLen, .info = pReq->info};
SEpSet epSet = mndGetVgroupEpset(pMnode, pVgroup);
SEpSet epSet = mndGetVgroupEpset(pMnode, pVgroup);
code = tmsgSendReq(&epSet, &rpcMsg);
if (code != 0) {
mError("vgId:%d, failed to send drop ttl table request to vnode since 0x%x", pVgroup->vgId, code);
@ -1500,8 +1500,8 @@ static int32_t mndAddSuperTableTag(const SStbObj *pOld, SStbObj *pNew, SArray *p
static int32_t mndCheckAlterColForTopic(SMnode *pMnode, const char *stbFullName, int64_t suid, col_id_t colId) {
int32_t code = 0;
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
while (1) {
SMqTopicObj *pTopic = NULL;
pIter = sdbFetch(pSdb, SDB_TOPIC, pIter, (void **)&pTopic);
@ -1562,8 +1562,8 @@ static int32_t mndCheckAlterColForTopic(SMnode *pMnode, const char *stbFullName,
static int32_t mndCheckAlterColForStream(SMnode *pMnode, const char *stbFullName, int64_t suid, col_id_t colId) {
int32_t code = 0;
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
while (1) {
SStreamObj *pStream = NULL;
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
@ -1616,8 +1616,8 @@ static int32_t mndCheckAlterColForStream(SMnode *pMnode, const char *stbFullName
static int32_t mndCheckAlterColForTSma(SMnode *pMnode, const char *stbFullName, int64_t suid, col_id_t colId) {
int32_t code = 0;
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
while (1) {
SSmaObj *pSma = NULL;
pIter = sdbFetch(pSdb, SDB_SMA, pIter, (void **)&pSma);
@ -2233,7 +2233,7 @@ static int32_t mndBuildStbCfgImp(SDbObj *pDb, SStbObj *pStb, const char *tbName,
static int32_t mndValidateStbVersion(SMnode *pMnode, SSTableVersion *pStbVer, bool *schema, bool *sma) {
int32_t code = 0;
char tbFName[TSDB_TABLE_FNAME_LEN] = {0};
char tbFName[TSDB_TABLE_FNAME_LEN] = {0};
snprintf(tbFName, sizeof(tbFName), "%s.%s", pStbVer->dbFName, pStbVer->stbName);
SDbObj *pDb = mndAcquireDb(pMnode, pStbVer->dbFName);
@ -2278,7 +2278,7 @@ static int32_t mndValidateStbVersion(SMnode *pMnode, SSTableVersion *pStbVer, bo
static int32_t mndBuildStbSchema(SMnode *pMnode, const char *dbFName, const char *tbName, STableMetaRsp *pRsp) {
int32_t code = 0;
char tbFName[TSDB_TABLE_FNAME_LEN] = {0};
char tbFName[TSDB_TABLE_FNAME_LEN] = {0};
snprintf(tbFName, sizeof(tbFName), "%s.%s", dbFName, tbName);
SDbObj *pDb = mndAcquireDb(pMnode, dbFName);
@ -2302,7 +2302,7 @@ static int32_t mndBuildStbSchema(SMnode *pMnode, const char *dbFName, const char
static int32_t mndBuildStbCfg(SMnode *pMnode, const char *dbFName, const char *tbName, STableCfgRsp *pRsp) {
int32_t code = 0;
char tbFName[TSDB_TABLE_FNAME_LEN] = {0};
char tbFName[TSDB_TABLE_FNAME_LEN] = {0};
snprintf(tbFName, sizeof(tbFName), "%s.%s", dbFName, tbName);
SDbObj *pDb = mndAcquireDb(pMnode, dbFName);
@ -2656,7 +2656,7 @@ static int32_t mndProcessAlterStbReq(SRpcMsg *pReq) {
code = mndAlterStb(pMnode, pReq, &alterReq, pDb, pStb);
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
SName name = {0};
SName name = {0};
int32_t ret = 0;
if ((ret = tNameFromString(&name, alterReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE)) != 0)
mError("stb:%s, failed to tNameFromString since %s", alterReq.name, tstrerror(ret));
@ -2779,8 +2779,8 @@ _OVER:
static int32_t mndCheckDropStbForTopic(SMnode *pMnode, const char *stbFullName, int64_t suid) {
int32_t code = 0;
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
while (1) {
SMqTopicObj *pTopic = NULL;
pIter = sdbFetch(pSdb, SDB_TOPIC, pIter, (void **)&pTopic);
@ -2839,8 +2839,8 @@ static int32_t mndCheckDropStbForTopic(SMnode *pMnode, const char *stbFullName,
static int32_t mndCheckDropStbForStream(SMnode *pMnode, const char *stbFullName, int64_t suid) {
int32_t code = 0;
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
while (1) {
SStreamObj *pStream = NULL;
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
@ -2945,7 +2945,7 @@ static int32_t mndProcessDropStbReq(SRpcMsg *pReq) {
code = mndDropStb(pMnode, pReq, pDb, pStb);
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
SName name = {0};
SName name = {0};
int32_t ret = 0;
if ((ret = tNameFromString(&name, dropReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE)) != 0)
mError("stb:%s, failed to tNameFromString since %s", dropReq.name, tstrerror(ret));
@ -3016,7 +3016,7 @@ _OVER:
mndReleaseUser(pMnode, pUser);
tFreeSTableMetaRsp(&metaRsp);
//TODO change to TAOS_RETURN
// TODO change to TAOS_RETURN
return code;
}
@ -3562,7 +3562,7 @@ static int32_t mndRetrieveStb(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBloc
SName name = {0};
char stbName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
char stbName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
mndExtractTbNameFromStbFullName(pStb->name, &stbName[VARSTR_HEADER_SIZE], TSDB_TABLE_NAME_LEN);
varDataSetLen(stbName, strlen(&stbName[VARSTR_HEADER_SIZE]));
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
@ -4259,7 +4259,9 @@ static int32_t mndProcessDropTbWithTsma(SRpcMsg *pReq) {
code = mndDropTbAddTsmaResTbsForSingleVg(pMnode, pCtx, pReq->pTbs, pReq->vgInfo.vgId);
if (code) goto _OVER;
}
if (mndCreateDropTbsTxnPrepare(pReq, pCtx) == 0) code = 0;
if (mndCreateDropTbsTxnPrepare(pReq, pCtx) == 0) {
code = TSDB_CODE_ACTION_IN_PROGRESS;
}
_OVER:
tFreeSMDropTbsReq(&dropReq);
if (pCtx) mndDestroyDropTbsWithTsmaCtx(pCtx);
@ -4458,7 +4460,7 @@ static int32_t mndProcessFetchTtlExpiredTbs(SRpcMsg *pRsp) {
code = mndDropTbAddTsmaResTbsForSingleVg(pMnode, pCtx, rsp.pExpiredTbs, rsp.vgId);
if (code) goto _end;
if (mndCreateDropTbsTxnPrepare(pRsp, pCtx) == 0) code = 0;
if (mndCreateDropTbsTxnPrepare(pRsp, pCtx) == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
_end:
if (pCtx) mndDestroyDropTbsWithTsmaCtx(pCtx);
tDecoderClear(&decoder);

View File

@ -324,7 +324,9 @@ void killAllCheckpointTrans(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo) {
size_t len = 0;
void *pKey = taosHashGetKey(pDb, &len);
tstrncpy(p, pKey, 128);
int cpLen = (127 < len) ? 127 : len;
TAOS_STRNCPY(p, pKey, cpLen);
p[cpLen] = '\0';
int32_t code = doKillCheckpointTrans(pMnode, pKey, len);
if (code) {

View File

@ -1583,7 +1583,7 @@ int32_t ctgDropTSMAForTbEnqueue(SCatalog *pCtg, SName *pName, bool syncOp) {
SCtgTSMACache *pCtgCache = NULL;
(void)tNameGetFullDbName(pName, dbFName);
CTG_ERR_JRET(ctgGetDBCache(pCtg, dbFName, &pDbCache));
CTG_ERR_JRET(ctgAcquireDBCache(pCtg, dbFName, &pDbCache));
if (NULL == pDbCache || !pDbCache->tsmaCache) {
goto _return;
}
@ -1613,6 +1613,7 @@ int32_t ctgDropTSMAForTbEnqueue(SCatalog *pCtg, SName *pName, bool syncOp) {
CTG_ERR_JRET(ctgEnqueue(pCtg, pOp));
taosHashRelease(pDbCache->tsmaCache, pCtgCache);
ctgReleaseDBCache(pCtg, pDbCache);
return TSDB_CODE_SUCCESS;
@ -1621,6 +1622,9 @@ _return:
if (pCtgCache) {
taosHashRelease(pDbCache->tsmaCache, pCtgCache);
}
if (pDbCache) {
ctgReleaseDBCache(pCtg, pDbCache);
}
if (pOp) {
taosMemoryFree(pOp->data);
taosMemoryFree(pOp);
@ -3996,17 +4000,20 @@ int32_t ctgGetTbTSMAFromCache(SCatalog* pCtg, SCtgTbTSMACtx* pCtx, int32_t dbIdx
if (pCache->retryFetch || hasOutOfDateTSMACache(pCache->pTsmas)) {
CTG_UNLOCK(CTG_READ, &pCache->tsmaLock);
taosHashRelease(dbCache->tsmaCache, pCache);
ctgDebug("tsma for tb: %s.%s not in cache", tsmaSourceTbName.tname, dbFName);
CTG_ERR_JRET(ctgAddTSMAFetch(&pCtx->pFetches, dbIdx, i, fetchIdx, baseResIdx + i, flag, FETCH_TB_TSMA, &tsmaSourceTbName));
if (NULL == taosArrayPush(pCtx->pResList, &(SMetaRes){0})) {
taosHashRelease(dbCache->tsmaCache, pCache);
CTG_ERR_JRET(terrno);
}
CTG_CACHE_NHIT_INC(CTG_CI_TBL_TSMA, 1);
CTG_LOCK(CTG_WRITE, &pCache->tsmaLock);
pCache->retryFetch = false;
CTG_UNLOCK(CTG_WRITE, &pCache->tsmaLock);
taosHashRelease(dbCache->tsmaCache, pCache);
continue;
}

View File

@ -890,7 +890,7 @@ void markGroupProcessed(STableScanInfo* pInfo, uint64_t groupId) {
} else {
int32_t code = taosHashRemove(pInfo->base.pTableListInfo->remainGroups, &groupId, sizeof(groupId));
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
qDebug("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
}
}
}
@ -4294,13 +4294,13 @@ _error:
return code;
}
static int32_t doTagScanOneTable(SOperatorInfo* pOperator, const SSDataBlock* pRes, int32_t count, SMetaReader* mr,
SStorageAPI* pAPI) {
static int32_t doTagScanOneTable(SOperatorInfo* pOperator, SSDataBlock* pRes, SMetaReader* mr, SStorageAPI* pAPI) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
STagScanInfo* pInfo = pOperator->info;
SExprInfo* pExprInfo = &pOperator->exprSupp.pExprInfo[0];
int32_t count = pRes->info.rows;
STableKeyInfo* item = tableListGetInfo(pInfo->pTableListInfo, pInfo->curPos);
if (!item) {
@ -4360,6 +4360,8 @@ _end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
pTaskInfo->code = code;
} else {
pRes->info.rows++;
}
return code;
@ -4715,26 +4717,23 @@ static int32_t doTagScanFromMetaEntryNext(SOperatorInfo* pOperator, SSDataBlock*
return code;
}
int32_t count = 0;
SMetaReader mr = {0};
pAPI->metaReaderFn.initReader(&mr, pInfo->readHandle.vnode, META_READER_LOCK, &pAPI->metaFn);
pRes->info.rows = 0;
while (pInfo->curPos < size && count < pOperator->resultInfo.capacity) {
code = doTagScanOneTable(pOperator, pRes, count, &mr, &pTaskInfo->storageAPI);
while (pInfo->curPos < size && pRes->info.rows < pOperator->resultInfo.capacity) {
code = doTagScanOneTable(pOperator, pRes, &mr, &pTaskInfo->storageAPI);
if (code != TSDB_CODE_OUT_OF_MEMORY) {
// ignore other error
code = TSDB_CODE_SUCCESS;
}
QUERY_CHECK_CODE(code, lino, _end);
++count;
if (++pInfo->curPos >= size) {
setOperatorCompleted(pOperator);
}
}
pRes->info.rows = count;
pAPI->metaReaderFn.clearReader(&mr);
bool bLimitReached = applyLimitOffset(&pInfo->limitInfo, pRes, pTaskInfo);
if (bLimitReached) {

View File

@ -131,9 +131,6 @@ const SSTabFltFuncDef filterDict[] = {
static int32_t buildDbTableInfoBlock(bool sysInfo, const SSDataBlock* p, const SSysTableMeta* pSysDbTableMeta,
size_t size, const char* dbName, int64_t* pRows);
static char* SYSTABLE_IDX_COLUMN[] = {"table_name", "db_name", "create_time", "columns",
"ttl", "stable_name", "vgroup_id', 'uid", "type"};
static char* SYSTABLE_SPECIAL_COL[] = {"db_name", "vgroup_id"};
static int32_t buildSysDbTableInfo(const SSysTableScanInfo* pInfo, int32_t capacity);
@ -2828,12 +2825,6 @@ _end:
return code;
}
static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator) {
SSDataBlock* pRes = NULL;
int32_t code = doBlockInfoScanNext(pOperator, &pRes);
return pRes;
}
static void destroyBlockDistScanOperatorInfo(void* param) {
SBlockDistInfo* pDistInfo = (SBlockDistInfo*)param;
blockDataDestroy(pDistInfo->pResBlock);
@ -2852,6 +2843,8 @@ static int32_t initTableblockDistQueryCond(uint64_t uid, SQueryTableDataCond* pC
pCond->colList = taosMemoryCalloc(1, sizeof(SColumnInfo));
pCond->pSlotList = taosMemoryMalloc(sizeof(int32_t));
if (pCond->colList == NULL || pCond->pSlotList == NULL) {
taosMemoryFree(pCond->colList);
taosMemoryFree(pCond->pSlotList);
return terrno;
}

View File

@ -278,7 +278,7 @@ static bool checkNullRow(SExprSupp* pExprSup, SSDataBlock* pSrcBlock, int32_t in
}
static bool genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp* pExprSup, SSDataBlock* pResBlock,
SSDataBlock* pSrcBlock, int32_t index, bool beforeTs, SExecTaskInfo* pTaskInfo) {
SSDataBlock* pSrcBlock, int32_t index, bool beforeTs, SExecTaskInfo* pTaskInfo, bool genAfterBlock) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
int32_t rows = pResBlock->info.rows;
@ -427,7 +427,7 @@ static bool genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp
break;
}
if (start.key == INT64_MIN || end.key == INT64_MIN) {
if (start.key == INT64_MIN || end.key == INT64_MIN || genAfterBlock) {
colDataSetNULL(pDst, rows);
break;
}
@ -463,8 +463,13 @@ static bool genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp
break;
}
if (genAfterBlock && rows == 0) {
hasInterp = false;
break;
}
SGroupKeys* pkey = taosArrayGet(pSliceInfo->pNextRow, srcSlot);
if (pkey->isNull == false) {
if (pkey->isNull == false && !genAfterBlock) {
code = colDataSetVal(pDst, rows, pkey->pData, false);
QUERY_CHECK_CODE(code, lino, _end);
} else {
@ -836,7 +841,7 @@ static void doTimesliceImpl(SOperatorInfo* pOperator, STimeSliceOperatorInfo* pS
int64_t nextTs = *(int64_t*)colDataGetData(pTsCol, i + 1);
if (nextTs > pSliceInfo->current) {
while (pSliceInfo->current < nextTs && pSliceInfo->current <= pSliceInfo->win.ekey) {
if (!genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock, pBlock, i, false, pTaskInfo) &&
if (!genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock, pBlock, i, false, pTaskInfo, false) &&
pSliceInfo->fillType == TSDB_FILL_LINEAR) {
break;
} else {
@ -864,7 +869,7 @@ static void doTimesliceImpl(SOperatorInfo* pOperator, STimeSliceOperatorInfo* pS
doKeepLinearInfo(pSliceInfo, pBlock, i);
while (pSliceInfo->current < ts && pSliceInfo->current <= pSliceInfo->win.ekey) {
if (!genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock, pBlock, i, true, pTaskInfo) &&
if (!genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock, pBlock, i, true, pTaskInfo, false) &&
pSliceInfo->fillType == TSDB_FILL_LINEAR) {
break;
} else {
@ -909,13 +914,12 @@ static void genInterpAfterDataBlock(STimeSliceOperatorInfo* pSliceInfo, SOperato
SSDataBlock* pResBlock = pSliceInfo->pRes;
SInterval* pInterval = &pSliceInfo->interval;
if (pSliceInfo->fillType == TSDB_FILL_NEXT || pSliceInfo->fillType == TSDB_FILL_LINEAR ||
pSliceInfo->pPrevGroupKey == NULL) {
if (pSliceInfo->pPrevGroupKey == NULL) {
return;
}
while (pSliceInfo->current <= pSliceInfo->win.ekey) {
(void)genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock, NULL, index, false, pOperator->pTaskInfo);
(void)genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock, NULL, index, false, pOperator->pTaskInfo, true);
pSliceInfo->current =
taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
}

View File

@ -28,12 +28,12 @@
} \
} while (0)
#define CHECK_OUT_OF_MEM(p) \
do { \
if (NULL == (p)) { \
pCxt->errCode = TSDB_CODE_OUT_OF_MEMORY; \
goto _err; \
} \
#define CHECK_OUT_OF_MEM(p) \
do { \
if (NULL == (p)) { \
pCxt->errCode = terrno; \
goto _err; \
} \
} while (0)
#define CHECK_PARSER_STATUS(pCxt) \

View File

@ -47,6 +47,7 @@ int32_t parse(SParseContext* pParseCxt, SQuery** pQuery) {
SAstCreateContext cxt;
initAstCreateContext(pParseCxt, &cxt);
void* pParser = ParseAlloc((FMalloc)taosMemoryMalloc);
if (!pParser) return terrno;
int32_t i = 0;
while (1) {
SToken t0 = {0};

View File

@ -61,6 +61,7 @@ typedef enum {
#define SCH_MAX_TASK_TIMEOUT_USEC 300000000
#define SCH_DEFAULT_MAX_RETRY_NUM 6
#define SCH_MIN_AYSNC_EXEC_NUM 3
#define SCH_DEFAULT_RETRY_TOTAL_ROUND 3
typedef struct SSchDebug {
bool lockEnable;

View File

@ -366,9 +366,11 @@ int32_t schChkUpdateRedirectCtx(SSchJob *pJob, SSchTask *pTask, SEpSet *pEpSet,
if (SCH_IS_DATA_BIND_TASK(pTask)) {
if (pEpSet) {
pCtx->roundTotal = pEpSet->numOfEps;
} else {
} else if (pTask->candidateAddrs && taosArrayGetSize(pTask->candidateAddrs) > 0) {
SQueryNodeAddr *pAddr = taosArrayGet(pTask->candidateAddrs, 0);
pCtx->roundTotal = pAddr->epSet.numOfEps;
} else {
pCtx->roundTotal = SCH_DEFAULT_RETRY_TOTAL_ROUND;
}
} else {
pCtx->roundTotal = 1;

View File

@ -3657,6 +3657,10 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pSta
pCur->db = wrapper->db;
pCur->iter = streamStateIterCreate(pState, "sess", (rocksdb_snapshot_t**)&pCur->snapshot,
(rocksdb_readoptions_t**)&pCur->readOpt);
if (pCur->iter == NULL) {
streamStateFreeCur(pCur);
return NULL;
}
char buf[128] = {0};
SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
@ -3679,6 +3683,11 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pSta
TAOS_UNUSED(stateSessionKeyDecode(&curKey, (char*)iKey));
if (stateSessionKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) >= 0) return pCur;
if (!rocksdb_iter_valid(pCur->iter)) {
streamStateFreeCur(pCur);
return NULL;
}
rocksdb_iter_prev(pCur->iter);
if (!rocksdb_iter_valid(pCur->iter)) {
streamStateFreeCur(pCur);

View File

@ -24,7 +24,8 @@
#define FILL_HISTORY_TASK_EXEC_INTERVAL 5000 // 5 sec
static int32_t streamTransferStateDoPrepare(SStreamTask* pTask);
static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* totalSize, int32_t* totalBlocks);
static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* totalSize,
int32_t* totalBlocks);
bool streamTaskShouldStop(const SStreamTask* pTask) {
SStreamTaskState pState = streamTaskGetStatus(pTask);
@ -95,17 +96,53 @@ static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray*
return code;
}
static int32_t doAppendPullOverBlock(SStreamTask* pTask, int32_t* pNumOfBlocks, SStreamDataBlock* pRetrieveBlock,
SArray* pRes) {
SSDataBlock block = {0};
int32_t num = taosArrayGetSize(pRetrieveBlock->blocks);
if (num != 1) {
stError("s-task:%s invalid retrieve block number:%d, ignore", pTask->id.idStr, num);
return TSDB_CODE_INVALID_PARA;
}
void* p = taosArrayGet(pRetrieveBlock->blocks, 0);
int32_t code = assignOneDataBlock(&block, p);
if (code) {
stError("s-task:%s failed to assign retrieve block, code:%s", pTask->id.idStr, tstrerror(code));
return code;
}
block.info.type = STREAM_PULL_OVER;
block.info.childId = pTask->info.selfChildId;
p = taosArrayPush(pRes, &block);
if (p != NULL) {
(*pNumOfBlocks) += 1;
stDebug("s-task:%s(child %d) retrieve res from upstream completed, QID:0x%" PRIx64, pTask->id.idStr,
pTask->info.selfChildId, pRetrieveBlock->reqId);
} else {
code = terrno;
stError("s-task:%s failed to append pull over block for retrieve data, QID:0x%" PRIx64" code:%s", pTask->id.idStr,
pRetrieveBlock->reqId, tstrerror(code));
}
return code;
}
int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* totalSize, int32_t* totalBlocks) {
int32_t code = TSDB_CODE_SUCCESS;
void* pExecutor = pTask->exec.pExecutor;
int32_t size = 0;
int32_t numOfBlocks = 0;
int32_t code = TSDB_CODE_SUCCESS;
void* pExecutor = pTask->exec.pExecutor;
SArray* pRes = NULL;
*totalBlocks = 0;
*totalSize = 0;
while (1) {
SSDataBlock* output = NULL;
uint64_t ts = 0;
if (pRes == NULL) {
pRes = taosArrayInit(4, sizeof(SSDataBlock));
}
@ -115,8 +152,6 @@ int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t*
return code;
}
SSDataBlock* output = NULL;
uint64_t ts = 0;
if ((code = qExecTask(pExecutor, &output, &ts)) < 0) {
if (code == TSDB_CODE_QRY_IN_EXEC) {
resetTaskInfo(pExecutor);
@ -124,6 +159,7 @@ int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t*
if (code == TSDB_CODE_OUT_OF_MEMORY || code == TSDB_CODE_INVALID_PARA || code == TSDB_CODE_FILE_CORRUPTED) {
stFatal("s-task:%s failed to continue execute since %s", pTask->id.idStr, tstrerror(code));
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
return code;
} else {
qResetTaskCode(pExecutor);
@ -133,33 +169,11 @@ int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t*
if (output == NULL) {
if (pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
SSDataBlock block = {0};
const SStreamDataBlock* pRetrieveBlock = (const SStreamDataBlock*)pItem;
int32_t num = taosArrayGetSize(pRetrieveBlock->blocks);
if (num != 1) {
stError("s-task:%s invalid retrieve block number:%d, ignore", pTask->id.idStr, num);
continue;
}
code = assignOneDataBlock(&block, taosArrayGet(pRetrieveBlock->blocks, 0));
if (code) {
stError("s-task:%s failed to copy datablock, code:%s", pTask->id.idStr, tstrerror(code));
continue;
}
block.info.type = STREAM_PULL_OVER;
block.info.childId = pTask->info.selfChildId;
void* p = taosArrayPush(pRes, &block);
if (p != NULL) {
numOfBlocks += 1;
} else {
stError("s-task:%s failed to add retrieve block", pTask->id.idStr);
}
stDebug("s-task:%s(child %d) retrieve process completed,QID:0x%" PRIx64 " dump results", pTask->id.idStr,
pTask->info.selfChildId, pRetrieveBlock->reqId);
code = doAppendPullOverBlock(pTask, &numOfBlocks, (SStreamDataBlock*) pItem, pRes);
if (code) {
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
return code;
}
}
break;
@ -189,11 +203,11 @@ int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t*
void* p = taosArrayPush(pRes, &block);
if (p == NULL) {
stError("s-task:%s failed to add computing results, the final res may be incorrect", pTask->id.idStr);
} else {
stDebug("s-task:%s (child %d) executed and get %d result blocks, size:%.2fMiB", pTask->id.idStr,
pTask->info.selfChildId, numOfBlocks, SIZE_IN_MiB(size));
}
stDebug("s-task:%s (child %d) executed and get %d result blocks, size:%.2fMiB", pTask->id.idStr,
pTask->info.selfChildId, numOfBlocks, SIZE_IN_MiB(size));
// current output should be dispatched to down stream nodes
if (numOfBlocks >= STREAM_RESULT_DUMP_THRESHOLD || size >= STREAM_RESULT_DUMP_SIZE_THRESHOLD) {
code = doDumpResult(pTask, pItem, pRes, size, totalSize, totalBlocks);
@ -303,7 +317,7 @@ SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st) {
bool finished = false;
const char* id = pTask->id.idStr;
if(pTask->info.taskLevel != TASK_LEVEL__SOURCE) {
if (pTask->info.taskLevel != TASK_LEVEL__SOURCE) {
stError("s-task:%s not source scan-history task, not exec, quit", pTask->id.idStr);
return buildScanhistoryExecRet(TASK_SCANHISTORY_QUIT, 0);
}
@ -408,7 +422,7 @@ int32_t streamTransferStateDoPrepare(SStreamTask* pTask) {
}
} else {
if (!(status == TASK_STATUS__READY || status == TASK_STATUS__PAUSE || status == TASK_STATUS__DROPPING ||
status == TASK_STATUS__STOP)) {
status == TASK_STATUS__STOP)) {
stError("s-task:%s invalid task status:%d", id, status);
return TSDB_CODE_STREAM_INTERNAL_ERROR;
}
@ -718,7 +732,7 @@ int32_t flushStateDataInExecutor(SStreamTask* pTask, SStreamQueueItem* pCheckpoi
// 2. flush data in executor to K/V store, which should be completed before do checkpoint in the K/V.
int32_t code = doStreamTaskExecImpl(pTask, pCheckpointBlock, 1);
if(code) {
if (code) {
stError("s-task:%s failed to exec stream task before checkpoint, code:%s", id, tstrerror(code));
}
@ -833,7 +847,7 @@ static int32_t doStreamExecTask(SStreamTask* pTask) {
if (pState.state == TASK_STATUS__CK) {
stDebug("s-task:%s checkpoint block received, set status:%s", id, pState.name);
code = streamTaskBuildCheckpoint(pTask); // ignore this error msg, and continue
} else { // todo refactor
} else { // todo refactor
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
code = streamTaskSendCheckpointSourceRsp(pTask);
} else {

View File

@ -99,8 +99,8 @@ int stateKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) {
}
SStreamState* streamStateOpen(const char* path, void* pTask, int64_t streamId, int32_t taskId) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SStreamState* pState = taosMemoryCalloc(1, sizeof(SStreamState));
stDebug("open stream state %p, %s", pState, path);
@ -170,12 +170,12 @@ int32_t streamStateFuncPut(SStreamState* pState, const SWinKey* key, const void*
int32_t lino = 0;
void* pVal = NULL;
int32_t len = getRowStateRowSize(pState->pFileState);
int32_t tmpLen = len;
int32_t tmpLen = len;
code = getFunctionRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), &pVal, &tmpLen);
QUERY_CHECK_CODE(code, lino, _end);
char* buf = ((SRowBuffPos*)pVal)->pRowBuff;
int32_t rowSize = streamFileStateGetSelectRowSize(pState->pFileState);
char* buf = ((SRowBuffPos*)pVal)->pRowBuff;
int32_t rowSize = streamFileStateGetSelectRowSize(pState->pFileState);
memcpy(buf + len - rowSize, value, vLen);
_end:
@ -189,12 +189,12 @@ int32_t streamStateFuncGet(SStreamState* pState, const SWinKey* key, void** ppVa
int32_t lino = 0;
void* pVal = NULL;
int32_t len = getRowStateRowSize(pState->pFileState);
int32_t tmpLen = len;
int32_t tmpLen = len;
code = getFunctionRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), (void**)(&pVal), &tmpLen);
QUERY_CHECK_CODE(code, lino, _end);
char* buf = ((SRowBuffPos*)pVal)->pRowBuff;
int32_t rowSize = streamFileStateGetSelectRowSize(pState->pFileState);
char* buf = ((SRowBuffPos*)pVal)->pRowBuff;
int32_t rowSize = streamFileStateGetSelectRowSize(pState->pFileState);
*ppVal = buf + len - rowSize;
streamStateReleaseBuf(pState, pVal, false);

View File

@ -937,6 +937,7 @@ static int walFindCurMetaVer(SWal* pWal) {
TdDirPtr pDir = taosOpenDir(pWal->path);
if (pDir == NULL) {
wError("vgId:%d, path:%s, failed to open since %s", pWal->cfg.vgId, pWal->path, tstrerror(terrno));
regfree(&walMetaRegexPattern);
return terrno;
}
@ -956,6 +957,7 @@ static int walFindCurMetaVer(SWal* pWal) {
}
if (taosCloseDir(&pDir) != 0) {
wError("failed to close dir, ret:%s", tstrerror(terrno));
regfree(&walMetaRegexPattern);
return terrno;
}
regfree(&walMetaRegexPattern);

View File

@ -907,7 +907,7 @@ class TDTestCase:
## {. . .}
tdSql.query(f"select interp(c0) from {dbname}.{tbname} range('2020-02-01 00:00:04', '2020-02-01 00:00:16') every(1s) fill(next)")
tdSql.checkRows(12)
tdSql.checkRows(13)
tdSql.checkData(0, 0, 5)
tdSql.checkData(1, 0, 5)
tdSql.checkData(2, 0, 10)
@ -920,6 +920,7 @@ class TDTestCase:
tdSql.checkData(9, 0, 15)
tdSql.checkData(10, 0, 15)
tdSql.checkData(11, 0, 15)
tdSql.checkData(12, 0, None)
## {} ...
tdSql.query(f"select interp(c0) from {dbname}.{tbname} range('2020-02-01 00:00:01', '2020-02-01 00:00:04') every(1s) fill(next)")
@ -957,10 +958,12 @@ class TDTestCase:
## ..{.}
tdSql.query(f"select interp(c0) from {dbname}.{tbname} range('2020-02-01 00:00:13', '2020-02-01 00:00:17') every(1s) fill(next)")
tdSql.checkRows(3)
tdSql.checkRows(5)
tdSql.checkData(0, 0, 15)
tdSql.checkData(1, 0, 15)
tdSql.checkData(2, 0, 15)
tdSql.checkData(3, 0, None)
tdSql.checkData(4, 0, None)
## ... {}
tdSql.query(f"select interp(c0) from {dbname}.{tbname} range('2020-02-01 00:00:16', '2020-02-01 00:00:19') every(1s) fill(next)")
@ -1272,7 +1275,7 @@ class TDTestCase:
tdSql.checkData(8, 1, True)
tdSql.query(f"select _irowts,_isfilled,interp(c0) from {dbname}.{tbname} range('2020-02-01 00:00:04', '2020-02-01 00:00:16') every(1s) fill(next)")
tdSql.checkRows(12)
tdSql.checkRows(13)
tdSql.checkCols(3)
tdSql.checkData(0, 0, '2020-02-01 00:00:04.000')
@ -1287,6 +1290,7 @@ class TDTestCase:
tdSql.checkData(9, 0, '2020-02-01 00:00:13.000')
tdSql.checkData(10, 0, '2020-02-01 00:00:14.000')
tdSql.checkData(11, 0, '2020-02-01 00:00:15.000')
tdSql.checkData(12, 0, '2020-02-01 00:00:16.000')
tdSql.checkData(0, 1, True)
tdSql.checkData(1, 1, False)
@ -1300,6 +1304,7 @@ class TDTestCase:
tdSql.checkData(9, 1, True)
tdSql.checkData(10, 1, True)
tdSql.checkData(11, 1, False)
tdSql.checkData(12, 1, True)
tdSql.query(f"select _irowts,_isfilled,interp(c0) from {dbname}.{tbname} range('2020-02-01 00:00:05', '2020-02-01 00:00:15') every(2s) fill(next)")
tdSql.checkRows(6)
@ -1677,9 +1682,13 @@ class TDTestCase:
## | . | { | .} |
tdSql.query(f"select interp(c0) from {dbname}.{tbname} range('2020-02-10 00:00:05', '2020-02-15 00:00:05') every(1d) fill(next)")
tdSql.checkRows(2)
tdSql.checkRows(6)
tdSql.checkData(0, 0, 15)
tdSql.checkData(1, 0, 15)
tdSql.checkData(2, 0, None)
tdSql.checkData(3, 0, None)
tdSql.checkData(4, 0, None)
tdSql.checkData(5, 0, None)
# test fill linear
@ -2732,7 +2741,7 @@ class TDTestCase:
tdSql.checkData(4, i, 15)
tdSql.query(f"select interp(c0),interp(c1),interp(c2),interp(c3) from {dbname}.{tbname} range('2020-02-09 00:00:05', '2020-02-13 00:00:05') every(1d) fill(next)")
tdSql.checkRows(3)
tdSql.checkRows(5)
tdSql.checkCols(4)
for i in range (tdSql.queryCols):
@ -2828,7 +2837,7 @@ class TDTestCase:
# test fill next
tdSql.query(f"select _irowts,_isfilled,interp(c0) from {dbname}.{tbname2} range('2020-02-02 00:00:00', '2020-02-02 00:00:18') every(1s) fill(next)")
tdSql.checkRows(18)
tdSql.checkRows(19)
tdSql.checkCols(3)
tdSql.checkData(0, 0, '2020-02-02 00:00:00.000')
@ -2851,6 +2860,7 @@ class TDTestCase:
tdSql.checkData(15, 2, None)
tdSql.checkData(16, 2, None)
tdSql.checkData(17, 2, None)
tdSql.checkData(18, 2, None)
tdSql.checkData(17, 0, '2020-02-02 00:00:17.000')
@ -3081,7 +3091,7 @@ class TDTestCase:
# test fill linear
tdSql.query(f"select _irowts,_isfilled,interp(c0) from {dbname}.{tbname2} range('2020-02-02 00:00:00', '2020-02-02 00:00:18') every(1s) fill(linear)")
tdSql.checkRows(17)
tdSql.checkRows(18)
tdSql.checkCols(3)
tdSql.checkData(0, 0, '2020-02-02 00:00:01.000')
@ -3103,8 +3113,9 @@ class TDTestCase:
tdSql.checkData(14, 2, None)
tdSql.checkData(15, 2, None)
tdSql.checkData(16, 2, None)
tdSql.checkData(17, 2, None)
tdSql.checkData(16, 0, '2020-02-02 00:00:17.000')
tdSql.checkData(17, 0, '2020-02-02 00:00:18.000')
tdLog.printNoPrefix("==========step13:test error cases")
@ -3220,7 +3231,7 @@ class TDTestCase:
tdSql.checkData(17, 1, True)
tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname} range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(next)")
tdSql.checkRows(18)
tdSql.checkRows(19)
tdSql.checkData(0, 0, '2020-02-01 00:00:00.000')
tdSql.checkData(0, 1, True)
@ -3243,9 +3254,12 @@ class TDTestCase:
tdSql.checkData(15, 2, 15)
tdSql.checkData(16, 2, 17)
tdSql.checkData(17, 2, 17)
tdSql.checkData(18, 2, None)
tdSql.checkData(17, 0, '2020-02-01 00:00:17.000')
tdSql.checkData(17, 1, False)
tdSql.checkData(18, 0, '2020-02-01 00:00:18.000')
tdSql.checkData(18, 1, True)
tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname} range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(linear)")
tdSql.checkRows(17)
@ -3362,24 +3376,24 @@ class TDTestCase:
tdSql.query(f"select tbname, _irowts, _isfilled, interp(c0) from {dbname}.{stbname} partition by tbname range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(next)")
tdSql.checkRows(48)
for i in range(0, 14):
tdSql.checkRows(57)
for i in range(0, 19):
tdSql.checkData(i, 0, 'ctb1')
for i in range(14, 30):
for i in range(19, 38):
tdSql.checkData(i, 0, 'ctb2')
for i in range(30, 48):
for i in range(38, 57):
tdSql.checkData(i, 0, 'ctb3')
tdSql.checkData(0, 1, '2020-02-01 00:00:00.000')
tdSql.checkData(13, 1, '2020-02-01 00:00:13.000')
tdSql.checkData(18, 1, '2020-02-01 00:00:18.000')
tdSql.checkData(14, 1, '2020-02-01 00:00:00.000')
tdSql.checkData(29, 1, '2020-02-01 00:00:15.000')
tdSql.checkData(19, 1, '2020-02-01 00:00:00.000')
tdSql.checkData(37, 1, '2020-02-01 00:00:18.000')
tdSql.checkData(30, 1, '2020-02-01 00:00:00.000')
tdSql.checkData(47, 1, '2020-02-01 00:00:17.000')
tdSql.checkData(38, 1, '2020-02-01 00:00:00.000')
tdSql.checkData(56, 1, '2020-02-01 00:00:18.000')
for i in range(0, 2):
tdSql.checkData(i, 3, 1)
@ -3390,24 +3404,33 @@ class TDTestCase:
for i in range(8, 14):
tdSql.checkData(i, 3, 13)
for i in range(14, 18):
for i in range(14, 19):
tdSql.checkData(i, 3, None)
for i in range(19, 23):
tdSql.checkData(i, 3, 3)
for i in range(18, 24):
for i in range(23, 29):
tdSql.checkData(i, 3, 9)
for i in range(24, 30):
for i in range(29, 35):
tdSql.checkData(i, 3, 15)
for i in range(30, 36):
for i in range(35, 38):
tdSql.checkData(i, 3, None)
for i in range(38, 44):
tdSql.checkData(i, 3, 5)
for i in range(36, 42):
for i in range(44, 50):
tdSql.checkData(i, 3, 11)
for i in range(42, 48):
for i in range(50, 56):
tdSql.checkData(i, 3, 17)
for i in range(56, 57):
tdSql.checkData(i, 3, None)
tdSql.query(f"select tbname, _irowts, _isfilled, interp(c0) from {dbname}.{stbname} partition by tbname range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(linear)")
tdSql.checkRows(39)
@ -3450,7 +3473,7 @@ class TDTestCase:
tdSql.checkRows(90)
tdSql.query(f"select c0, _irowts, _isfilled, interp(c0) from {dbname}.{stbname} partition by c0 range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(next)")
tdSql.checkRows(90)
tdSql.checkRows(171)
tdSql.query(f"select c0, _irowts, _isfilled, interp(c0) from {dbname}.{stbname} partition by c0 range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(linear)")
tdSql.checkRows(9)
@ -3467,7 +3490,7 @@ class TDTestCase:
tdSql.checkRows(48)
tdSql.query(f"select t1, _irowts, _isfilled, interp(c0) from {dbname}.{stbname} partition by t1 range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(next)")
tdSql.checkRows(48)
tdSql.checkRows(57)
tdSql.query(f"select t1, _irowts, _isfilled, interp(c0) from {dbname}.{stbname} partition by t1 range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(linear)")
tdSql.checkRows(39)
@ -4363,7 +4386,7 @@ class TDTestCase:
tdSql.query(f"select _irowts, _isfilled, interp(c0, 1) from {dbname}.{tbname_null} range('2020-02-02 00:00:01', '2020-02-02 00:00:11') every(1s) fill(next)")
tdSql.checkRows(9)
tdSql.checkRows(11)
tdSql.checkData(0, 1, False)
tdSql.checkData(1, 1, True)
tdSql.checkData(2, 1, False)
@ -4373,6 +4396,8 @@ class TDTestCase:
tdSql.checkData(6, 1, True)
tdSql.checkData(7, 1, False)
tdSql.checkData(8, 1, False)
tdSql.checkData(9, 1, True)
tdSql.checkData(10, 1, True)
tdSql.checkData(0, 2, 1)
tdSql.checkData(1, 2, 3)
@ -4383,11 +4408,13 @@ class TDTestCase:
tdSql.checkData(6, 2, 8)
tdSql.checkData(7, 2, 8)
tdSql.checkData(8, 2, 9)
tdSql.checkData(9, 2, None)
tdSql.checkData(10, 2, None)
tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{tbname_null} where c0 is not null range('2020-02-02 00:00:01', '2020-02-02 00:00:11') every(1s) fill(next)")
tdSql.checkRows(9)
tdSql.checkRows(11)
tdSql.checkData(0, 1, False)
tdSql.checkData(1, 1, True)
tdSql.checkData(2, 1, False)
@ -4397,6 +4424,9 @@ class TDTestCase:
tdSql.checkData(6, 1, True)
tdSql.checkData(7, 1, False)
tdSql.checkData(8, 1, False)
tdSql.checkData(9, 1, True)
tdSql.checkData(10, 1, True)
tdSql.checkData(0, 2, 1)
tdSql.checkData(1, 2, 3)
@ -4407,6 +4437,8 @@ class TDTestCase:
tdSql.checkData(6, 2, 8)
tdSql.checkData(7, 2, 8)
tdSql.checkData(8, 2, 9)
tdSql.checkData(9, 2, None)
tdSql.checkData(10, 2, None)
# super table
tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname_null} range('2020-02-01 00:00:01', '2020-02-01 00:00:17') every(2s) fill(next)")
@ -4443,7 +4475,7 @@ class TDTestCase:
tdSql.query(f"select _irowts, _isfilled, interp(c0, 1) from {dbname}.{stbname_null} range('2020-02-01 00:00:01', '2020-02-01 00:00:17') every(2s) fill(next)")
tdSql.checkRows(8)
tdSql.checkRows(9)
tdSql.checkData(0, 1, False)
tdSql.checkData(1, 1, True)
tdSql.checkData(2, 1, True)
@ -4452,6 +4484,7 @@ class TDTestCase:
tdSql.checkData(5, 1, True)
tdSql.checkData(6, 1, False)
tdSql.checkData(7, 1, False)
tdSql.checkData(8, 1, True)
tdSql.checkData(0, 2, 1)
tdSql.checkData(1, 2, 9)
@ -4461,11 +4494,12 @@ class TDTestCase:
tdSql.checkData(5, 2, 13)
tdSql.checkData(6, 2, 13)
tdSql.checkData(7, 2, 15)
tdSql.checkData(8, 2, None)
tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname_null} where c0 is not null range('2020-02-01 00:00:01', '2020-02-01 00:00:17') every(2s) fill(next)")
tdSql.checkRows(8)
tdSql.checkRows(9)
tdSql.checkData(0, 1, False)
tdSql.checkData(1, 1, True)
tdSql.checkData(2, 1, True)
@ -4474,6 +4508,7 @@ class TDTestCase:
tdSql.checkData(5, 1, True)
tdSql.checkData(6, 1, False)
tdSql.checkData(7, 1, False)
tdSql.checkData(8, 1, True)
tdSql.checkData(0, 2, 1)
tdSql.checkData(1, 2, 9)
@ -4483,36 +4518,37 @@ class TDTestCase:
tdSql.checkData(5, 2, 13)
tdSql.checkData(6, 2, 13)
tdSql.checkData(7, 2, 15)
tdSql.checkData(8, 2, None)
tdSql.query(f"select tbname, _irowts, _isfilled, interp(c0, 1) from {dbname}.{stbname_null} partition by tbname range('2020-02-01 00:00:01', '2020-02-01 00:00:17') every(2s) fill(next)")
tdSql.checkRows(15)
for i in range(0, 7):
tdSql.checkRows(18)
for i in range(0, 9):
tdSql.checkData(i, 0, 'ctb1_null')
for i in range(7, 15):
for i in range(9, 18):
tdSql.checkData(i, 0, 'ctb2_null')
tdSql.checkData(0, 1, '2020-02-01 00:00:01.000')
tdSql.checkData(6, 1, '2020-02-01 00:00:13.000')
tdSql.checkData(8, 1, '2020-02-01 00:00:17.000')
tdSql.checkData(7, 1, '2020-02-01 00:00:01.000')
tdSql.checkData(14, 1, '2020-02-01 00:00:15.000')
tdSql.checkData(9, 1, '2020-02-01 00:00:01.000')
tdSql.checkData(17, 1, '2020-02-01 00:00:17.000')
tdSql.query(f"select tbname, _irowts, _isfilled, interp(c0) from {dbname}.{stbname_null} where c0 is not null partition by tbname range('2020-02-01 00:00:01', '2020-02-01 00:00:17') every(2s) fill(next)")
tdSql.checkRows(15)
for i in range(0, 7):
tdSql.checkRows(18)
for i in range(0, 9):
tdSql.checkData(i, 0, 'ctb1_null')
for i in range(7, 15):
for i in range(9, 18):
tdSql.checkData(i, 0, 'ctb2_null')
tdSql.checkData(0, 1, '2020-02-01 00:00:01.000')
tdSql.checkData(6, 1, '2020-02-01 00:00:13.000')
tdSql.checkData(8, 1, '2020-02-01 00:00:17.000')
tdSql.checkData(7, 1, '2020-02-01 00:00:01.000')
tdSql.checkData(14, 1, '2020-02-01 00:00:15.000')
tdSql.checkData(9, 1, '2020-02-01 00:00:01.000')
tdSql.checkData(17, 1, '2020-02-01 00:00:17.000')
# fill linear
# normal table