Merge branch '3.0' of https://github.com/taosdata/TDengine into feat/TD-22023
This commit is contained in:
commit
b16fc7d9df
|
@ -211,6 +211,7 @@ typedef struct SStoreTqReader {
|
||||||
bool (*tqNextBlockImpl)(); // todo remove it
|
bool (*tqNextBlockImpl)(); // todo remove it
|
||||||
SSDataBlock* (*tqGetResultBlock)();
|
SSDataBlock* (*tqGetResultBlock)();
|
||||||
int64_t (*tqGetResultBlockTime)();
|
int64_t (*tqGetResultBlockTime)();
|
||||||
|
int32_t (*tqGetStreamExecProgress)();
|
||||||
|
|
||||||
void (*tqReaderSetColIdList)();
|
void (*tqReaderSetColIdList)();
|
||||||
int32_t (*tqReaderSetQueryTableList)();
|
int32_t (*tqReaderSetQueryTableList)();
|
||||||
|
@ -266,16 +267,11 @@ typedef struct SStoreMeta {
|
||||||
// support filter and non-filter cases. [vnodeGetCtbIdList & vnodeGetCtbIdListByFilter]
|
// support filter and non-filter cases. [vnodeGetCtbIdList & vnodeGetCtbIdListByFilter]
|
||||||
int32_t (*getChildTableList)(void* pVnode, int64_t suid, SArray* list);
|
int32_t (*getChildTableList)(void* pVnode, int64_t suid, SArray* list);
|
||||||
int32_t (*storeGetTableList)(void* pVnode, int8_t type, SArray* pList);
|
int32_t (*storeGetTableList)(void* pVnode, int8_t type, SArray* pList);
|
||||||
void* storeGetVersionRange;
|
int32_t (*getTableSchema)(void* pVnode, int64_t uid, STSchema** pSchema, int64_t* suid);
|
||||||
void* storeGetLastTimestamp;
|
|
||||||
|
|
||||||
int32_t (*getTableSchema)(void* pVnode, int64_t uid, STSchema** pSchema, int64_t* suid); // tsdbGetTableSchema
|
|
||||||
int32_t (*getNumOfChildTables)(void* pVnode, int64_t uid, int64_t* numOfTables, int32_t* numOfCols);
|
int32_t (*getNumOfChildTables)(void* pVnode, int64_t uid, int64_t* numOfTables, int32_t* numOfCols);
|
||||||
void (*getBasicInfo)(void* pVnode, const char** dbname, int32_t* vgId, int64_t* numOfTables,
|
void (*getBasicInfo)(void* pVnode, const char** dbname, int32_t* vgId, int64_t* numOfTables,
|
||||||
int64_t* numOfNormalTables);
|
int64_t* numOfNormalTables);
|
||||||
|
|
||||||
int64_t (*getNumOfRowsInMem)(void* pVnode);
|
|
||||||
|
|
||||||
SMCtbCursor* (*openCtbCursor)(void* pVnode, tb_uid_t uid, int lock);
|
SMCtbCursor* (*openCtbCursor)(void* pVnode, tb_uid_t uid, int lock);
|
||||||
int32_t (*resumeCtbCursor)(SMCtbCursor* pCtbCur, int8_t first);
|
int32_t (*resumeCtbCursor)(SMCtbCursor* pCtbCur, int8_t first);
|
||||||
void (*pauseCtbCursor)(SMCtbCursor* pCtbCur);
|
void (*pauseCtbCursor)(SMCtbCursor* pCtbCur);
|
||||||
|
|
|
@ -313,7 +313,7 @@ typedef struct SCheckpointInfo {
|
||||||
int64_t failedId; // record the latest failed checkpoint id
|
int64_t failedId; // record the latest failed checkpoint id
|
||||||
int64_t checkpointingId;
|
int64_t checkpointingId;
|
||||||
int32_t downstreamAlignNum;
|
int32_t downstreamAlignNum;
|
||||||
int32_t checkpointNotReadyTasks;
|
int32_t numOfNotReady;
|
||||||
bool dispatchCheckpointTrigger;
|
bool dispatchCheckpointTrigger;
|
||||||
int64_t msgVer;
|
int64_t msgVer;
|
||||||
int32_t transId;
|
int32_t transId;
|
||||||
|
|
|
@ -90,6 +90,8 @@ int32_t vnodeGetStbColumnNum(SVnode *pVnode, tb_uid_t suid, int *num);
|
||||||
int32_t vnodeGetTimeSeriesNum(SVnode *pVnode, int64_t *num);
|
int32_t vnodeGetTimeSeriesNum(SVnode *pVnode, int64_t *num);
|
||||||
int32_t vnodeGetAllCtbNum(SVnode *pVnode, int64_t *num);
|
int32_t vnodeGetAllCtbNum(SVnode *pVnode, int64_t *num);
|
||||||
|
|
||||||
|
int32_t vnodeGetTableSchema(void *pVnode, int64_t uid, STSchema **pSchema, int64_t *suid);
|
||||||
|
|
||||||
void vnodeResetLoad(SVnode *pVnode, SVnodeLoad *pLoad);
|
void vnodeResetLoad(SVnode *pVnode, SVnodeLoad *pLoad);
|
||||||
int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad);
|
int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad);
|
||||||
int32_t vnodeGetLoadLite(SVnode *pVnode, SVnodeLoadLite *pLoad);
|
int32_t vnodeGetLoadLite(SVnode *pVnode, SVnodeLoadLite *pLoad);
|
||||||
|
@ -180,7 +182,6 @@ int32_t tsdbCacherowsReaderOpen(void *pVnode, int32_t type, void *pTableIdList,
|
||||||
int32_t tsdbRetrieveCacheRows(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds, const int32_t *dstSlotIds,
|
int32_t tsdbRetrieveCacheRows(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds, const int32_t *dstSlotIds,
|
||||||
SArray *pTableUids);
|
SArray *pTableUids);
|
||||||
void *tsdbCacherowsReaderClose(void *pReader);
|
void *tsdbCacherowsReaderClose(void *pReader);
|
||||||
int32_t tsdbGetTableSchema(void *pVnode, int64_t uid, STSchema **pSchema, int64_t *suid);
|
|
||||||
|
|
||||||
void tsdbCacheSetCapacity(SVnode *pVnode, size_t capacity);
|
void tsdbCacheSetCapacity(SVnode *pVnode, size_t capacity);
|
||||||
size_t tsdbCacheGetCapacity(SVnode *pVnode);
|
size_t tsdbCacheGetCapacity(SVnode *pVnode);
|
||||||
|
@ -233,6 +234,7 @@ int32_t tqReaderSetSubmitMsg(STqReader *pReader, void *msgStr, int32_t msgLen, i
|
||||||
bool tqNextDataBlockFilterOut(STqReader *pReader, SHashObj *filterOutUids);
|
bool tqNextDataBlockFilterOut(STqReader *pReader, SHashObj *filterOutUids);
|
||||||
int32_t tqRetrieveDataBlock(STqReader *pReader, SSDataBlock **pRes, const char *idstr);
|
int32_t tqRetrieveDataBlock(STqReader *pReader, SSDataBlock **pRes, const char *idstr);
|
||||||
int32_t tqRetrieveTaosxBlock(STqReader *pReader, SArray *blocks, SArray *schemas, SSubmitTbData **pSubmitTbDataRet);
|
int32_t tqRetrieveTaosxBlock(STqReader *pReader, SArray *blocks, SArray *schemas, SSubmitTbData **pSubmitTbDataRet);
|
||||||
|
int32_t tqGetStreamExecInfo(SVnode* pVnode, int64_t streamId, int64_t* pDelay, bool* fhFinished);
|
||||||
|
|
||||||
// sma
|
// sma
|
||||||
int32_t smaGetTSmaDays(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_t *days);
|
int32_t smaGetTSmaDays(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_t *days);
|
||||||
|
|
|
@ -97,7 +97,6 @@ typedef struct {
|
||||||
struct STQ {
|
struct STQ {
|
||||||
SVnode* pVnode;
|
SVnode* pVnode;
|
||||||
char* path;
|
char* path;
|
||||||
int64_t walLogLastVer;
|
|
||||||
SRWLatch lock;
|
SRWLatch lock;
|
||||||
SHashObj* pPushMgr; // subKey -> STqHandle
|
SHashObj* pPushMgr; // subKey -> STqHandle
|
||||||
SHashObj* pHandle; // subKey -> STqHandle
|
SHashObj* pHandle; // subKey -> STqHandle
|
||||||
|
@ -153,13 +152,13 @@ char* tqOffsetBuildFName(const char* path, int32_t fVer);
|
||||||
int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname);
|
int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname);
|
||||||
|
|
||||||
// tq util
|
// tq util
|
||||||
int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, void** pRefBlock, int32_t type);
|
int32_t tqExtractDelDataBlock(const void* pData, int32_t len, int64_t ver, void** pRefBlock, int32_t type);
|
||||||
int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg);
|
int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg);
|
||||||
int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch, int64_t consumerId,
|
int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch, int64_t consumerId,
|
||||||
int32_t type, int64_t sver, int64_t ever);
|
int32_t type, int64_t sver, int64_t ever);
|
||||||
int32_t tqInitDataRsp(SMqDataRsp* pRsp, STqOffsetVal pOffset);
|
int32_t tqInitDataRsp(SMqDataRsp* pRsp, STqOffsetVal pOffset);
|
||||||
void tqUpdateNodeStage(STQ* pTq, bool isLeader);
|
void tqUpdateNodeStage(STQ* pTq, bool isLeader);
|
||||||
int32_t setDstTableDataPayload(uint64_t suid, const STSchema* pTSchema, int32_t blockIndex, SSDataBlock* pDataBlock,
|
int32_t tqSetDstTableDataPayload(uint64_t suid, const STSchema* pTSchema, int32_t blockIndex, SSDataBlock* pDataBlock,
|
||||||
SSubmitTbData* pTableData, const char* id);
|
SSubmitTbData* pTableData, const char* id);
|
||||||
int32_t doMergeExistedRows(SSubmitTbData* pExisted, const SSubmitTbData* pNew, const char* id);
|
int32_t doMergeExistedRows(SSubmitTbData* pExisted, const SSubmitTbData* pNew, const char* id);
|
||||||
|
|
||||||
|
|
|
@ -279,6 +279,7 @@ int32_t tsdbReadDelIdx(SDelFReader *pReader, SArray *aDelIdx);
|
||||||
// tsdbRead.c ==============================================================================================
|
// tsdbRead.c ==============================================================================================
|
||||||
int32_t tsdbTakeReadSnap2(STsdbReader *pReader, _query_reseek_func_t reseek, STsdbReadSnap **ppSnap);
|
int32_t tsdbTakeReadSnap2(STsdbReader *pReader, _query_reseek_func_t reseek, STsdbReadSnap **ppSnap);
|
||||||
void tsdbUntakeReadSnap2(STsdbReader *pReader, STsdbReadSnap *pSnap, bool proactive);
|
void tsdbUntakeReadSnap2(STsdbReader *pReader, STsdbReadSnap *pSnap, bool proactive);
|
||||||
|
int32_t tsdbGetTableSchema(SMeta* pMeta, int64_t uid, STSchema** pSchema, int64_t* suid);
|
||||||
|
|
||||||
// tsdbMerge.c ==============================================================================================
|
// tsdbMerge.c ==============================================================================================
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
@ -970,8 +971,6 @@ static FORCE_INLINE TSDBROW *tsdbTbDataIterGet(STbDataIter *pIter) {
|
||||||
return pIter->pRow;
|
return pIter->pRow;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tRowInfoCmprFn(const void *p1, const void *p2);
|
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int64_t suid;
|
int64_t suid;
|
||||||
int64_t uid;
|
int64_t uid;
|
||||||
|
|
|
@ -1554,7 +1554,7 @@ static int32_t tdRSmaBatchExec(SSma *pSma, SRSmaInfo *pInfo, STaosQall *qall, SA
|
||||||
}
|
}
|
||||||
_resume_delete:
|
_resume_delete:
|
||||||
version = RSMA_EXEC_MSG_VER(msg);
|
version = RSMA_EXEC_MSG_VER(msg);
|
||||||
if ((terrno = extractDelDataBlock(RSMA_EXEC_MSG_BODY(msg), RSMA_EXEC_MSG_LEN(msg), version,
|
if ((terrno = tqExtractDelDataBlock(RSMA_EXEC_MSG_BODY(msg), RSMA_EXEC_MSG_LEN(msg), version,
|
||||||
&packData.pDataBlock, 1))) {
|
&packData.pDataBlock, 1))) {
|
||||||
taosFreeQitem(msg);
|
taosFreeQitem(msg);
|
||||||
goto _err;
|
goto _err;
|
||||||
|
|
|
@ -203,7 +203,7 @@ int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema *
|
||||||
|
|
||||||
int32_t *index = taosHashGet(pTableIndexMap, &groupId, sizeof(groupId));
|
int32_t *index = taosHashGet(pTableIndexMap, &groupId, sizeof(groupId));
|
||||||
if (index == NULL) { // no data yet, append it
|
if (index == NULL) { // no data yet, append it
|
||||||
code = setDstTableDataPayload(suid, pTSchema, i, pDataBlock, &tbData, "");
|
code = tqSetDstTableDataPayload(suid, pTSchema, i, pDataBlock, &tbData, "");
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -213,7 +213,7 @@ int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema *
|
||||||
int32_t size = (int32_t)taosArrayGetSize(pReq->aSubmitTbData) - 1;
|
int32_t size = (int32_t)taosArrayGetSize(pReq->aSubmitTbData) - 1;
|
||||||
taosHashPut(pTableIndexMap, &groupId, sizeof(groupId), &size, sizeof(size));
|
taosHashPut(pTableIndexMap, &groupId, sizeof(groupId), &size, sizeof(size));
|
||||||
} else {
|
} else {
|
||||||
code = setDstTableDataPayload(suid, pTSchema, i, pDataBlock, &tbData, "");
|
code = tqSetDstTableDataPayload(suid, pTSchema, i, pDataBlock, &tbData, "");
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
|
@ -66,7 +66,6 @@ STQ* tqOpen(const char* path, SVnode* pVnode) {
|
||||||
|
|
||||||
pTq->path = taosStrdup(path);
|
pTq->path = taosStrdup(path);
|
||||||
pTq->pVnode = pVnode;
|
pTq->pVnode = pVnode;
|
||||||
pTq->walLogLastVer = pVnode->pWal->vers.lastVer;
|
|
||||||
|
|
||||||
pTq->pHandle = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
|
pTq->pHandle = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
|
||||||
taosHashSetFreeFp(pTq->pHandle, tqDestroyTqHandle);
|
taosHashSetFreeFp(pTq->pHandle, tqDestroyTqHandle);
|
||||||
|
@ -1055,7 +1054,7 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
int32_t code = tqStreamTaskProcessRunReq(pTq->pStreamMeta, pMsg, vnodeIsRoleLeader(pTq->pVnode));
|
int32_t code = tqStreamTaskProcessRunReq(pTq->pStreamMeta, pMsg, vnodeIsRoleLeader(pTq->pVnode));
|
||||||
|
|
||||||
// let's continue scan data in the wal files
|
// let's continue scan data in the wal files
|
||||||
if(code == 0 && pReq->reqType >= 0){
|
if (code == 0 && (pReq->reqType >= 0 || pReq->reqType == STREAM_EXEC_T_RESUME_TASK)) {
|
||||||
tqScanWalAsync(pTq, false);
|
tqScanWalAsync(pTq, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -344,7 +344,7 @@ int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, int64_t maxVer, con
|
||||||
void* pBody = POINTER_SHIFT(pCont->body, sizeof(SMsgHead));
|
void* pBody = POINTER_SHIFT(pCont->body, sizeof(SMsgHead));
|
||||||
int32_t len = pCont->bodyLen - sizeof(SMsgHead);
|
int32_t len = pCont->bodyLen - sizeof(SMsgHead);
|
||||||
|
|
||||||
code = extractDelDataBlock(pBody, len, ver, (void**)pItem, 0);
|
code = tqExtractDelDataBlock(pBody, len, ver, (void**)pItem, 0);
|
||||||
if (code == TSDB_CODE_SUCCESS) {
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
if (*pItem == NULL) {
|
if (*pItem == NULL) {
|
||||||
tqDebug("s-task:%s empty delete msg, discard it, len:%d, ver:%" PRId64, id, len, ver);
|
tqDebug("s-task:%s empty delete msg, discard it, len:%d, ver:%" PRId64, id, len, ver);
|
||||||
|
|
|
@ -746,7 +746,7 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat
|
||||||
return TDB_CODE_SUCCESS;
|
return TDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t setDstTableDataPayload(uint64_t suid, const STSchema *pTSchema, int32_t blockIndex, SSDataBlock* pDataBlock,
|
int32_t tqSetDstTableDataPayload(uint64_t suid, const STSchema *pTSchema, int32_t blockIndex, SSDataBlock* pDataBlock,
|
||||||
SSubmitTbData* pTableData, const char* id) {
|
SSubmitTbData* pTableData, const char* id) {
|
||||||
int32_t numOfRows = pDataBlock->info.rows;
|
int32_t numOfRows = pDataBlock->info.rows;
|
||||||
|
|
||||||
|
@ -821,7 +821,7 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
code = setDstTableDataPayload(suid, pTSchema, i, pDataBlock, &tbData, id);
|
code = tqSetDstTableDataPayload(suid, pTSchema, i, pDataBlock, &tbData, id);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -868,7 +868,7 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
code = setDstTableDataPayload(suid, pTSchema, i, pDataBlock, &tbData, id);
|
code = tqSetDstTableDataPayload(suid, pTSchema, i, pDataBlock, &tbData, id);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -878,7 +878,7 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) {
|
||||||
int32_t size = (int32_t)taosArrayGetSize(submitReq.aSubmitTbData) - 1;
|
int32_t size = (int32_t)taosArrayGetSize(submitReq.aSubmitTbData) - 1;
|
||||||
taosHashPut(pTableIndexMap, &groupId, sizeof(groupId), &size, sizeof(size));
|
taosHashPut(pTableIndexMap, &groupId, sizeof(groupId), &size, sizeof(size));
|
||||||
} else {
|
} else {
|
||||||
code = setDstTableDataPayload(suid, pTSchema, i, pDataBlock, &tbData, id);
|
code = tqSetDstTableDataPayload(suid, pTSchema, i, pDataBlock, &tbData, id);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
|
@ -388,7 +388,7 @@ int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp*
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, void** pRefBlock, int32_t type) {
|
int32_t tqExtractDelDataBlock(const void* pData, int32_t len, int64_t ver, void** pRefBlock, int32_t type) {
|
||||||
SDecoder* pCoder = &(SDecoder){0};
|
SDecoder* pCoder = &(SDecoder){0};
|
||||||
SDeleteRes* pRes = &(SDeleteRes){0};
|
SDeleteRes* pRes = &(SDeleteRes){0};
|
||||||
|
|
||||||
|
@ -449,3 +449,73 @@ int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, void**
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t tqGetStreamExecInfo(SVnode* pVnode, int64_t streamId, int64_t* pDelay, bool* fhFinished) {
|
||||||
|
SStreamMeta* pMeta = pVnode->pTq->pStreamMeta;
|
||||||
|
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
|
if (pDelay != NULL) {
|
||||||
|
*pDelay = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
*fhFinished = false;
|
||||||
|
|
||||||
|
if (numOfTasks <= 0) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
// extract the required source task for a given stream, identified by streamId
|
||||||
|
for (int32_t i = 0; i < numOfTasks; ++i) {
|
||||||
|
STaskId* pId = taosArrayGet(pMeta->pTaskList, i);
|
||||||
|
if (pId->streamId != streamId) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
SStreamTask** ppTask = taosHashGet(pMeta->pTasksMap, pId, sizeof(*pId));
|
||||||
|
if (ppTask == NULL) {
|
||||||
|
tqError("vgId:%d failed to acquire task:0x%" PRIx64 " in retrieving progress", pMeta->vgId, pId->taskId);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if ((*ppTask)->info.taskLevel != TASK_LEVEL__SOURCE) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// here we get the required stream source task
|
||||||
|
SStreamTask* pTask = *ppTask;
|
||||||
|
*fhFinished = !HAS_RELATED_FILLHISTORY_TASK(pTask);
|
||||||
|
|
||||||
|
int64_t ver = walReaderGetCurrentVer(pTask->exec.pWalReader);
|
||||||
|
|
||||||
|
SVersionRange verRange = {0};
|
||||||
|
walReaderValidVersionRange(pTask->exec.pWalReader, &verRange.minVer, &verRange.maxVer);
|
||||||
|
|
||||||
|
SWalReader* pReader = walOpenReader(pTask->exec.pWalReader->pWal, NULL, 0);
|
||||||
|
if (pReader == NULL) {
|
||||||
|
tqError("failed to open wal reader to extract exec progress, vgId:%d", pMeta->vgId);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
int64_t cur = 0;
|
||||||
|
int64_t latest = 0;
|
||||||
|
|
||||||
|
code = walFetchHead(pReader, ver);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
cur = pReader->pHead->head.ingestTs;
|
||||||
|
}
|
||||||
|
|
||||||
|
code = walFetchHead(pReader, verRange.maxVer);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
latest = pReader->pHead->head.ingestTs;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pDelay != NULL) { // delay in ms
|
||||||
|
*pDelay = (latest - cur) / 1000;
|
||||||
|
}
|
||||||
|
|
||||||
|
walCloseReader(pReader);
|
||||||
|
}
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
|
@ -2628,6 +2628,58 @@ static bool moveToNextTableForPreFileSetMem(SReaderStatus* pStatus) {
|
||||||
return (pStatus->pProcMemTableIter != NULL);
|
return (pStatus->pProcMemTableIter != NULL);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void buildCleanBlockFromSttFiles(STsdbReader* pReader, STableBlockScanInfo* pScanInfo) {
|
||||||
|
SReaderStatus* pStatus = &pReader->status;
|
||||||
|
SSttBlockReader* pSttBlockReader = pStatus->fileIter.pSttBlockReader;
|
||||||
|
SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock;
|
||||||
|
|
||||||
|
bool asc = ASCENDING_TRAVERSE(pReader->info.order);
|
||||||
|
|
||||||
|
SDataBlockInfo* pInfo = &pResBlock->info;
|
||||||
|
blockDataEnsureCapacity(pResBlock, pScanInfo->numOfRowsInStt);
|
||||||
|
|
||||||
|
pInfo->rows = pScanInfo->numOfRowsInStt;
|
||||||
|
pInfo->id.uid = pScanInfo->uid;
|
||||||
|
pInfo->dataLoad = 1;
|
||||||
|
pInfo->window = pScanInfo->sttWindow;
|
||||||
|
|
||||||
|
setComposedBlockFlag(pReader, true);
|
||||||
|
|
||||||
|
pScanInfo->sttKeyInfo.nextProcKey = asc ? pScanInfo->sttWindow.ekey + 1 : pScanInfo->sttWindow.skey - 1;
|
||||||
|
pScanInfo->sttKeyInfo.status = STT_FILE_NO_DATA;
|
||||||
|
pScanInfo->lastProcKey = asc ? pScanInfo->sttWindow.ekey : pScanInfo->sttWindow.skey;
|
||||||
|
pScanInfo->sttBlockReturned = true;
|
||||||
|
|
||||||
|
pSttBlockReader->mergeTree.pIter = NULL;
|
||||||
|
|
||||||
|
tsdbDebug("%p uid:%" PRId64 " return clean stt block as one, brange:%" PRId64 "-%" PRId64 " rows:%" PRId64 " %s",
|
||||||
|
pReader, pResBlock->info.id.uid, pResBlock->info.window.skey, pResBlock->info.window.ekey,
|
||||||
|
pResBlock->info.rows, pReader->idStr);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void buildCleanBlockFromDataFiles(STsdbReader* pReader, STableBlockScanInfo* pScanInfo,
|
||||||
|
SFileDataBlockInfo* pBlockInfo, int32_t blockIndex) {
|
||||||
|
// whole block is required, return it directly
|
||||||
|
SReaderStatus* pStatus = &pReader->status;
|
||||||
|
SDataBlockInfo* pInfo = &pReader->resBlockInfo.pResBlock->info;
|
||||||
|
bool asc = ASCENDING_TRAVERSE(pReader->info.order);
|
||||||
|
|
||||||
|
pInfo->rows = pBlockInfo->numRow;
|
||||||
|
pInfo->id.uid = pScanInfo->uid;
|
||||||
|
pInfo->dataLoad = 0;
|
||||||
|
pInfo->version = pReader->info.verRange.maxVer;
|
||||||
|
pInfo->window = (STimeWindow){.skey = pBlockInfo->firstKey, .ekey = pBlockInfo->lastKey};
|
||||||
|
setComposedBlockFlag(pReader, false);
|
||||||
|
setBlockAllDumped(&pStatus->fBlockDumpInfo, pBlockInfo->lastKey, pReader->info.order);
|
||||||
|
|
||||||
|
// update the last key for the corresponding table
|
||||||
|
pScanInfo->lastProcKey = asc ? pInfo->window.ekey : pInfo->window.skey;
|
||||||
|
tsdbDebug("%p uid:%" PRIu64 " clean file block retrieved from file, global index:%d, "
|
||||||
|
"table index:%d, rows:%d, brange:%" PRId64 "-%" PRId64 ", %s",
|
||||||
|
pReader, pScanInfo->uid, blockIndex, pBlockInfo->tbBlockIdx, pBlockInfo->numRow, pBlockInfo->firstKey,
|
||||||
|
pBlockInfo->lastKey, pReader->idStr);
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t doLoadSttBlockSequentially(STsdbReader* pReader) {
|
static int32_t doLoadSttBlockSequentially(STsdbReader* pReader) {
|
||||||
SReaderStatus* pStatus = &pReader->status;
|
SReaderStatus* pStatus = &pReader->status;
|
||||||
SSttBlockReader* pSttBlockReader = pStatus->fileIter.pSttBlockReader;
|
SSttBlockReader* pSttBlockReader = pStatus->fileIter.pSttBlockReader;
|
||||||
|
@ -2680,28 +2732,7 @@ static int32_t doLoadSttBlockSequentially(STsdbReader* pReader) {
|
||||||
|
|
||||||
// if only require the total rows, no need to load data from stt file if it is clean stt blocks
|
// if only require the total rows, no need to load data from stt file if it is clean stt blocks
|
||||||
if (pReader->info.execMode == READER_EXEC_ROWS && pScanInfo->cleanSttBlocks) {
|
if (pReader->info.execMode == READER_EXEC_ROWS && pScanInfo->cleanSttBlocks) {
|
||||||
bool asc = ASCENDING_TRAVERSE(pReader->info.order);
|
buildCleanBlockFromSttFiles(pReader, pScanInfo);
|
||||||
|
|
||||||
SDataBlockInfo* pInfo = &pResBlock->info;
|
|
||||||
blockDataEnsureCapacity(pResBlock, pScanInfo->numOfRowsInStt);
|
|
||||||
|
|
||||||
pInfo->rows = pScanInfo->numOfRowsInStt;
|
|
||||||
pInfo->id.uid = pScanInfo->uid;
|
|
||||||
pInfo->dataLoad = 1;
|
|
||||||
pInfo->window = pScanInfo->sttWindow;
|
|
||||||
|
|
||||||
setComposedBlockFlag(pReader, true);
|
|
||||||
|
|
||||||
pScanInfo->sttKeyInfo.nextProcKey = asc ? pScanInfo->sttWindow.ekey + 1 : pScanInfo->sttWindow.skey - 1;
|
|
||||||
pScanInfo->sttKeyInfo.status = STT_FILE_NO_DATA;
|
|
||||||
pScanInfo->lastProcKey = asc ? pScanInfo->sttWindow.ekey : pScanInfo->sttWindow.skey;
|
|
||||||
pScanInfo->sttBlockReturned = true;
|
|
||||||
|
|
||||||
pSttBlockReader->mergeTree.pIter = NULL;
|
|
||||||
|
|
||||||
tsdbDebug("%p uid:%" PRId64 " return clean stt block as one, brange:%" PRId64 "-%" PRId64 " rows:%" PRId64 " %s",
|
|
||||||
pReader, pResBlock->info.id.uid, pResBlock->info.window.skey, pResBlock->info.window.ekey,
|
|
||||||
pResBlock->info.rows, pReader->idStr);
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2741,10 +2772,11 @@ static int32_t doLoadSttBlockSequentially(STsdbReader* pReader) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool notOverlapWithSttFiles(SFileDataBlockInfo* pBlockInfo, STableBlockScanInfo* pScanInfo, bool asc) {
|
// current active data block not overlap with the stt-files/stt-blocks
|
||||||
|
static bool notOverlapWithFiles(SFileDataBlockInfo* pBlockInfo, STableBlockScanInfo* pScanInfo, bool asc) {
|
||||||
ASSERT(pScanInfo->sttKeyInfo.status != STT_FILE_READER_UNINIT);
|
ASSERT(pScanInfo->sttKeyInfo.status != STT_FILE_READER_UNINIT);
|
||||||
|
|
||||||
if (pScanInfo->sttKeyInfo.status == STT_FILE_NO_DATA) {
|
if ((!hasDataInSttBlock(pScanInfo)) || (pScanInfo->cleanSttBlocks == true)) {
|
||||||
return true;
|
return true;
|
||||||
} else {
|
} else {
|
||||||
int64_t keyInStt = pScanInfo->sttKeyInfo.nextProcKey;
|
int64_t keyInStt = pScanInfo->sttKeyInfo.nextProcKey;
|
||||||
|
@ -2794,24 +2826,32 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
|
||||||
int64_t endKey = getBoarderKeyInFiles(pBlockInfo, pScanInfo, pReader->info.order);
|
int64_t endKey = getBoarderKeyInFiles(pBlockInfo, pScanInfo, pReader->info.order);
|
||||||
code = buildDataBlockFromBuf(pReader, pScanInfo, endKey);
|
code = buildDataBlockFromBuf(pReader, pScanInfo, endKey);
|
||||||
} else {
|
} else {
|
||||||
if (notOverlapWithSttFiles(pBlockInfo, pScanInfo, asc)) {
|
if (notOverlapWithFiles(pBlockInfo, pScanInfo, asc)) {
|
||||||
// whole block is required, return it directly
|
int64_t keyInStt = pScanInfo->sttKeyInfo.nextProcKey;
|
||||||
SDataBlockInfo* pInfo = &pReader->resBlockInfo.pResBlock->info;
|
|
||||||
pInfo->rows = pBlockInfo->numRow;
|
|
||||||
pInfo->id.uid = pScanInfo->uid;
|
|
||||||
pInfo->dataLoad = 0;
|
|
||||||
pInfo->version = pReader->info.verRange.maxVer;
|
|
||||||
pInfo->window = (STimeWindow){.skey = pBlockInfo->firstKey, .ekey = pBlockInfo->lastKey};
|
|
||||||
setComposedBlockFlag(pReader, false);
|
|
||||||
setBlockAllDumped(&pStatus->fBlockDumpInfo, pBlockInfo->lastKey, pReader->info.order);
|
|
||||||
|
|
||||||
// update the last key for the corresponding table
|
if ((!hasDataInSttBlock(pScanInfo)) || (asc && pBlockInfo->lastKey < keyInStt) ||
|
||||||
pScanInfo->lastProcKey = asc ? pInfo->window.ekey : pInfo->window.skey;
|
(!asc && pBlockInfo->firstKey > keyInStt)) {
|
||||||
tsdbDebug("%p uid:%" PRIu64
|
if (pScanInfo->cleanSttBlocks && hasDataInSttBlock(pScanInfo)) {
|
||||||
" clean file block retrieved from file, global index:%d, "
|
if (asc) { // file block is located before the stt block
|
||||||
"table index:%d, rows:%d, brange:%" PRId64 "-%" PRId64 ", %s",
|
ASSERT(pScanInfo->sttWindow.skey > pBlockInfo->lastKey);
|
||||||
pReader, pScanInfo->uid, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlockInfo->numRow,
|
} else { // stt block is before the file block
|
||||||
pBlockInfo->firstKey, pBlockInfo->lastKey, pReader->idStr);
|
ASSERT(pScanInfo->sttWindow.ekey < pBlockInfo->firstKey);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
buildCleanBlockFromDataFiles(pReader, pScanInfo, pBlockInfo, pBlockIter->index);
|
||||||
|
} else { // clean stt block
|
||||||
|
if (asc) {
|
||||||
|
ASSERT(pScanInfo->sttWindow.ekey < pBlockInfo->firstKey);
|
||||||
|
} else {
|
||||||
|
ASSERT(pScanInfo->sttWindow.skey > pBlockInfo->lastKey);
|
||||||
|
}
|
||||||
|
|
||||||
|
// return the stt file block
|
||||||
|
ASSERT(pReader->info.execMode == READER_EXEC_ROWS && pSttBlockReader->mergeTree.pIter == NULL);
|
||||||
|
buildCleanBlockFromSttFiles(pReader, pScanInfo);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
SBlockData* pBData = &pReader->status.fileBlockData;
|
SBlockData* pBData = &pReader->status.fileBlockData;
|
||||||
tBlockDataReset(pBData);
|
tBlockDataReset(pBData);
|
||||||
|
@ -2822,7 +2862,6 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
|
||||||
int64_t st = taosGetTimestampUs();
|
int64_t st = taosGetTimestampUs();
|
||||||
|
|
||||||
// let's load data from stt files, make sure clear the cleanStt block flag before load the data from stt files
|
// let's load data from stt files, make sure clear the cleanStt block flag before load the data from stt files
|
||||||
pScanInfo->cleanSttBlocks = false;
|
|
||||||
initSttBlockReader(pSttBlockReader, pScanInfo, pReader);
|
initSttBlockReader(pSttBlockReader, pScanInfo, pReader);
|
||||||
|
|
||||||
// no data in stt block, no need to proceed.
|
// no data in stt block, no need to proceed.
|
||||||
|
@ -2840,8 +2879,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
|
||||||
|
|
||||||
// data in stt now overlaps with current active file data block, need to composed with file data block.
|
// data in stt now overlaps with current active file data block, need to composed with file data block.
|
||||||
int64_t lastKeyInStt = getCurrentKeyInSttBlock(pSttBlockReader);
|
int64_t lastKeyInStt = getCurrentKeyInSttBlock(pSttBlockReader);
|
||||||
if ((lastKeyInStt >= pBlockInfo->firstKey && asc) ||
|
if ((lastKeyInStt >= pBlockInfo->firstKey && asc) || (lastKeyInStt <= pBlockInfo->lastKey && (!asc))) {
|
||||||
(lastKeyInStt <= pBlockInfo->lastKey && (!asc))) {
|
|
||||||
tsdbDebug("%p lastKeyInStt:%" PRId64 ", overlap with file block, brange:%" PRId64 "-%" PRId64 " %s", pReader,
|
tsdbDebug("%p lastKeyInStt:%" PRId64 ", overlap with file block, brange:%" PRId64 "-%" PRId64 " %s", pReader,
|
||||||
lastKeyInStt, pBlockInfo->firstKey, pBlockInfo->lastKey, pReader->idStr);
|
lastKeyInStt, pBlockInfo->firstKey, pBlockInfo->lastKey, pReader->idStr);
|
||||||
break;
|
break;
|
||||||
|
@ -4995,9 +5033,9 @@ int64_t tsdbGetNumOfRowsInMemTable2(STsdbReader* pReader) {
|
||||||
return rows;
|
return rows;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tsdbGetTableSchema(void* pVnode, int64_t uid, STSchema** pSchema, int64_t* suid) {
|
int32_t tsdbGetTableSchema(SMeta* pMeta, int64_t uid, STSchema** pSchema, int64_t* suid) {
|
||||||
SMetaReader mr = {0};
|
SMetaReader mr = {0};
|
||||||
metaReaderDoInit(&mr, ((SVnode*)pVnode)->pMeta, 0);
|
metaReaderDoInit(&mr, pMeta, 0);
|
||||||
int32_t code = metaReaderGetTableEntryByUidCache(&mr, uid);
|
int32_t code = metaReaderGetTableEntryByUidCache(&mr, uid);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
|
terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
|
||||||
|
@ -5027,7 +5065,7 @@ int32_t tsdbGetTableSchema(void* pVnode, int64_t uid, STSchema** pSchema, int64_
|
||||||
metaReaderClear(&mr);
|
metaReaderClear(&mr);
|
||||||
|
|
||||||
// get the newest table schema version
|
// get the newest table schema version
|
||||||
code = metaGetTbTSchemaEx(((SVnode*)pVnode)->pMeta, *suid, uid, -1, pSchema);
|
code = metaGetTbTSchemaEx(pMeta, *suid, uid, -1, pSchema);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -91,7 +91,7 @@ void initMetadataAPI(SStoreMeta* pMeta) {
|
||||||
pMeta->getTableTypeByName = metaGetTableTypeByName;
|
pMeta->getTableTypeByName = metaGetTableTypeByName;
|
||||||
pMeta->getTableNameByUid = metaGetTableNameByUid;
|
pMeta->getTableNameByUid = metaGetTableNameByUid;
|
||||||
|
|
||||||
pMeta->getTableSchema = tsdbGetTableSchema; // todo refactor
|
pMeta->getTableSchema = vnodeGetTableSchema;
|
||||||
pMeta->storeGetTableList = vnodeGetTableList;
|
pMeta->storeGetTableList = vnodeGetTableList;
|
||||||
|
|
||||||
pMeta->getCachedTableList = metaGetCachedTableUidList;
|
pMeta->getCachedTableList = metaGetCachedTableUidList;
|
||||||
|
@ -135,7 +135,9 @@ void initTqAPI(SStoreTqReader* pTq) {
|
||||||
|
|
||||||
pTq->tqReaderNextBlockFilterOut = tqNextDataBlockFilterOut;
|
pTq->tqReaderNextBlockFilterOut = tqNextDataBlockFilterOut;
|
||||||
pTq->tqGetResultBlockTime = tqGetResultBlockTime;
|
pTq->tqGetResultBlockTime = tqGetResultBlockTime;
|
||||||
}
|
|
||||||
|
pTq->tqGetStreamExecProgress = tqGetStreamExecInfo;
|
||||||
|
}
|
||||||
|
|
||||||
void initStateStoreAPI(SStateStore* pStore) {
|
void initStateStoreAPI(SStateStore* pStore) {
|
||||||
pStore->streamFileStateInit = streamFileStateInit;
|
pStore->streamFileStateInit = streamFileStateInit;
|
||||||
|
|
|
@ -14,6 +14,7 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "vnd.h"
|
#include "vnd.h"
|
||||||
|
#include "tsdb.h"
|
||||||
|
|
||||||
#define VNODE_GET_LOAD_RESET_VALS(pVar, oVal, vType, tags) \
|
#define VNODE_GET_LOAD_RESET_VALS(pVar, oVal, vType, tags) \
|
||||||
do { \
|
do { \
|
||||||
|
@ -703,3 +704,7 @@ void *vnodeGetIvtIdx(void *pVnode) {
|
||||||
}
|
}
|
||||||
return metaGetIvtIdx(((SVnode *)pVnode)->pMeta);
|
return metaGetIvtIdx(((SVnode *)pVnode)->pMeta);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t vnodeGetTableSchema(void *pVnode, int64_t uid, STSchema **pSchema, int64_t *suid) {
|
||||||
|
return tsdbGetTableSchema(((SVnode*)pVnode)->pMeta, uid, pSchema, suid);
|
||||||
|
}
|
||||||
|
|
|
@ -220,7 +220,6 @@ static int32_t setSingleOutputTupleBufv1(SResultRowInfo* pResultRowInfo, STimeWi
|
||||||
|
|
||||||
(*pResult)->win = *win;
|
(*pResult)->win = *win;
|
||||||
|
|
||||||
clearResultRowInitFlag(pExprSup->pCtx, pExprSup->numOfExprs);
|
|
||||||
setResultRowInitCtx(*pResult, pExprSup->pCtx, pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset);
|
setResultRowInitCtx(*pResult, pExprSup->pCtx, pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -262,6 +261,7 @@ int32_t eventWindowAggImpl(SOperatorInfo* pOperator, SEventWindowOperatorInfo* p
|
||||||
} else if (pInfo->groupId != gid) {
|
} else if (pInfo->groupId != gid) {
|
||||||
// this is a new group, reset the info
|
// this is a new group, reset the info
|
||||||
pInfo->inWindow = false;
|
pInfo->inWindow = false;
|
||||||
|
pInfo->groupId = gid;
|
||||||
}
|
}
|
||||||
|
|
||||||
SFilterColumnParam param1 = {.numOfCols = taosArrayGetSize(pBlock->pDataBlock), .pDataBlock = pBlock->pDataBlock};
|
SFilterColumnParam param1 = {.numOfCols = taosArrayGetSize(pBlock->pDataBlock), .pDataBlock = pBlock->pDataBlock};
|
||||||
|
@ -319,6 +319,9 @@ int32_t eventWindowAggImpl(SOperatorInfo* pOperator, SEventWindowOperatorInfo* p
|
||||||
doKeepNewWindowStartInfo(pRowSup, tsList, rowIndex, gid);
|
doKeepNewWindowStartInfo(pRowSup, tsList, rowIndex, gid);
|
||||||
pInfo->inWindow = true;
|
pInfo->inWindow = true;
|
||||||
startIndex = rowIndex;
|
startIndex = rowIndex;
|
||||||
|
if (pInfo->pRow != NULL) {
|
||||||
|
clearResultRowInitFlag(pSup->pCtx, pSup->numOfExprs);
|
||||||
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -725,7 +725,6 @@ SOperatorInfo* createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhys
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pInfo->isHistoryOp) {
|
if (pInfo->isHistoryOp) {
|
||||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
|
||||||
pInfo->pAllUpdated = tSimpleHashInit(64, hashFn);
|
pInfo->pAllUpdated = tSimpleHashInit(64, hashFn);
|
||||||
} else {
|
} else {
|
||||||
pInfo->pAllUpdated = NULL;
|
pInfo->pAllUpdated = NULL;
|
||||||
|
|
|
@ -8181,27 +8181,27 @@ static int32_t createLastTsSelectStmt(char* pDb, char* pTable, STableMeta* pMeta
|
||||||
|
|
||||||
tstrncpy(col->tableAlias, pTable, tListLen(col->tableAlias));
|
tstrncpy(col->tableAlias, pTable, tListLen(col->tableAlias));
|
||||||
tstrncpy(col->colName, pMeta->schema[0].name, tListLen(col->colName));
|
tstrncpy(col->colName, pMeta->schema[0].name, tListLen(col->colName));
|
||||||
SNodeList* pParamterList = nodesMakeList();
|
SNodeList* pParameterList = nodesMakeList();
|
||||||
if (NULL == pParamterList) {
|
if (NULL == pParameterList) {
|
||||||
nodesDestroyNode((SNode*)col);
|
nodesDestroyNode((SNode*)col);
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t code = nodesListStrictAppend(pParamterList, (SNode*)col);
|
int32_t code = nodesListStrictAppend(pParameterList, (SNode*)col);
|
||||||
if (code) {
|
if (code) {
|
||||||
nodesDestroyList(pParamterList);
|
nodesDestroyList(pParameterList);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
SNode* pFunc = (SNode*)createFunction("last", pParamterList);
|
SNode* pFunc = (SNode*)createFunction("last", pParameterList);
|
||||||
if (NULL == pFunc) {
|
if (NULL == pFunc) {
|
||||||
nodesDestroyList(pParamterList);
|
nodesDestroyList(pParameterList);
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
|
|
||||||
SNodeList* pProjectionList = nodesMakeList();
|
SNodeList* pProjectionList = nodesMakeList();
|
||||||
if (NULL == pProjectionList) {
|
if (NULL == pProjectionList) {
|
||||||
nodesDestroyList(pParamterList);
|
nodesDestroyNode(pFunc);
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -8213,7 +8213,7 @@ static int32_t createLastTsSelectStmt(char* pDb, char* pTable, STableMeta* pMeta
|
||||||
|
|
||||||
SFunctionNode* pFunc1 = createFunction("_vgid", NULL);
|
SFunctionNode* pFunc1 = createFunction("_vgid", NULL);
|
||||||
if (NULL == pFunc1) {
|
if (NULL == pFunc1) {
|
||||||
nodesDestroyList(pParamterList);
|
nodesDestroyList(pProjectionList);
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -8226,7 +8226,7 @@ static int32_t createLastTsSelectStmt(char* pDb, char* pTable, STableMeta* pMeta
|
||||||
|
|
||||||
SFunctionNode* pFunc2 = createFunction("_vgver", NULL);
|
SFunctionNode* pFunc2 = createFunction("_vgver", NULL);
|
||||||
if (NULL == pFunc2) {
|
if (NULL == pFunc2) {
|
||||||
nodesDestroyList(pParamterList);
|
nodesDestroyList(pProjectionList);
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -8243,24 +8243,54 @@ static int32_t createLastTsSelectStmt(char* pDb, char* pTable, STableMeta* pMeta
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
// todo add the group by statement
|
|
||||||
SSelectStmt** pSelect1 = (SSelectStmt**)pQuery;
|
SSelectStmt** pSelect1 = (SSelectStmt**)pQuery;
|
||||||
(*pSelect1)->pGroupByList = nodesMakeList();
|
(*pSelect1)->pGroupByList = nodesMakeList();
|
||||||
|
if (NULL == (*pSelect1)->pGroupByList) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
SGroupingSetNode* pNode1 = (SGroupingSetNode*)nodesMakeNode(QUERY_NODE_GROUPING_SET);
|
SGroupingSetNode* pNode1 = (SGroupingSetNode*)nodesMakeNode(QUERY_NODE_GROUPING_SET);
|
||||||
|
if (NULL == pNode1) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
pNode1->groupingSetType = GP_TYPE_NORMAL;
|
pNode1->groupingSetType = GP_TYPE_NORMAL;
|
||||||
pNode1->pParameterList = nodesMakeList();
|
pNode1->pParameterList = nodesMakeList();
|
||||||
nodesListAppend(pNode1->pParameterList, (SNode*)pFunc1);
|
if (NULL == pNode1->pParameterList) {
|
||||||
|
nodesDestroyNode((SNode*)pNode1);
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
nodesListAppend((*pSelect1)->pGroupByList, (SNode*)pNode1);
|
code = nodesListAppend(pNode1->pParameterList, (SNode*)pFunc1);
|
||||||
|
if (code) {
|
||||||
|
nodesDestroyNode((SNode*)pNode1);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
code = nodesListAppend((*pSelect1)->pGroupByList, (SNode*)pNode1);
|
||||||
|
if (code) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
SGroupingSetNode* pNode2 = (SGroupingSetNode*)nodesMakeNode(QUERY_NODE_GROUPING_SET);
|
SGroupingSetNode* pNode2 = (SGroupingSetNode*)nodesMakeNode(QUERY_NODE_GROUPING_SET);
|
||||||
|
if (NULL == pNode2) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
pNode2->groupingSetType = GP_TYPE_NORMAL;
|
pNode2->groupingSetType = GP_TYPE_NORMAL;
|
||||||
pNode2->pParameterList = nodesMakeList();
|
pNode2->pParameterList = nodesMakeList();
|
||||||
nodesListAppend(pNode2->pParameterList, (SNode*)pFunc2);
|
if (NULL == pNode2->pParameterList) {
|
||||||
|
nodesDestroyNode((SNode*)pNode1);
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
nodesListAppend((*pSelect1)->pGroupByList, (SNode*)pNode2);
|
code = nodesListAppend(pNode2->pParameterList, (SNode*)pFunc2);
|
||||||
|
if (code) {
|
||||||
|
nodesDestroyNode((SNode*)pNode2);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
code = nodesListAppend((*pSelect1)->pGroupByList, (SNode*)pNode2);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -737,6 +737,13 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, char *sql) {
|
||||||
QW_ERR_JRET(code);
|
QW_ERR_JRET(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#if 0
|
||||||
|
SReadHandle* pReadHandle = qwMsg->node;
|
||||||
|
int64_t delay = 0;
|
||||||
|
bool fhFinish = false;
|
||||||
|
pReadHandle->api.tqReaderFn.tqGetStreamExecProgress(pReadHandle->vnode, 0, &delay, &fhFinish);
|
||||||
|
#endif
|
||||||
|
|
||||||
code = qCreateExecTask(qwMsg->node, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, sql, OPTR_EXEC_MODEL_BATCH);
|
code = qCreateExecTask(qwMsg->node, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, sql, OPTR_EXEC_MODEL_BATCH);
|
||||||
sql = NULL;
|
sql = NULL;
|
||||||
if (code) {
|
if (code) {
|
||||||
|
|
|
@ -158,7 +158,7 @@ int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSo
|
||||||
|
|
||||||
pTask->chkInfo.transId = pReq->transId;
|
pTask->chkInfo.transId = pReq->transId;
|
||||||
pTask->chkInfo.checkpointingId = pReq->checkpointId;
|
pTask->chkInfo.checkpointingId = pReq->checkpointId;
|
||||||
pTask->chkInfo.checkpointNotReadyTasks = streamTaskGetNumOfDownstream(pTask);
|
pTask->chkInfo.numOfNotReady = streamTaskGetNumOfDownstream(pTask);
|
||||||
pTask->chkInfo.startTs = taosGetTimestampMs();
|
pTask->chkInfo.startTs = taosGetTimestampMs();
|
||||||
pTask->execInfo.checkpoint += 1;
|
pTask->execInfo.checkpoint += 1;
|
||||||
|
|
||||||
|
@ -214,7 +214,7 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc
|
||||||
stDebug("s-task:%s set childIdx:%d, and add checkpoint-trigger block into outputQ", id, pTask->info.selfChildId);
|
stDebug("s-task:%s set childIdx:%d, and add checkpoint-trigger block into outputQ", id, pTask->info.selfChildId);
|
||||||
continueDispatchCheckpointBlock(pBlock, pTask);
|
continueDispatchCheckpointBlock(pBlock, pTask);
|
||||||
} else { // only one task exists, no need to dispatch downstream info
|
} else { // only one task exists, no need to dispatch downstream info
|
||||||
atomic_add_fetch_32(&pTask->chkInfo.checkpointNotReadyTasks, 1);
|
atomic_add_fetch_32(&pTask->chkInfo.numOfNotReady, 1);
|
||||||
streamProcessCheckpointReadyMsg(pTask);
|
streamProcessCheckpointReadyMsg(pTask);
|
||||||
streamFreeQitem((SStreamQueueItem*)pBlock);
|
streamFreeQitem((SStreamQueueItem*)pBlock);
|
||||||
}
|
}
|
||||||
|
@ -249,7 +249,7 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc
|
||||||
|
|
||||||
// set the needed checked downstream tasks, only when all downstream tasks do checkpoint complete, this task
|
// set the needed checked downstream tasks, only when all downstream tasks do checkpoint complete, this task
|
||||||
// can start local checkpoint procedure
|
// can start local checkpoint procedure
|
||||||
pTask->chkInfo.checkpointNotReadyTasks = streamTaskGetNumOfDownstream(pTask);
|
pTask->chkInfo.numOfNotReady = streamTaskGetNumOfDownstream(pTask);
|
||||||
|
|
||||||
// Put the checkpoint block into inputQ, to make sure all blocks with less version have been handled by this task
|
// Put the checkpoint block into inputQ, to make sure all blocks with less version have been handled by this task
|
||||||
// already. And then, dispatch check point msg to all downstream tasks
|
// already. And then, dispatch check point msg to all downstream tasks
|
||||||
|
@ -268,7 +268,7 @@ int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask) {
|
||||||
ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE || pTask->info.taskLevel == TASK_LEVEL__AGG);
|
ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE || pTask->info.taskLevel == TASK_LEVEL__AGG);
|
||||||
|
|
||||||
// only when all downstream tasks are send checkpoint rsp, we can start the checkpoint procedure for the agg task
|
// only when all downstream tasks are send checkpoint rsp, we can start the checkpoint procedure for the agg task
|
||||||
int32_t notReady = atomic_sub_fetch_32(&pTask->chkInfo.checkpointNotReadyTasks, 1);
|
int32_t notReady = atomic_sub_fetch_32(&pTask->chkInfo.numOfNotReady, 1);
|
||||||
ASSERT(notReady >= 0);
|
ASSERT(notReady >= 0);
|
||||||
|
|
||||||
if (notReady == 0) {
|
if (notReady == 0) {
|
||||||
|
@ -287,7 +287,7 @@ void streamTaskClearCheckInfo(SStreamTask* pTask, bool clearChkpReadyMsg) {
|
||||||
pTask->chkInfo.checkpointingId = 0; // clear the checkpoint id
|
pTask->chkInfo.checkpointingId = 0; // clear the checkpoint id
|
||||||
pTask->chkInfo.failedId = 0;
|
pTask->chkInfo.failedId = 0;
|
||||||
pTask->chkInfo.startTs = 0; // clear the recorded start time
|
pTask->chkInfo.startTs = 0; // clear the recorded start time
|
||||||
pTask->chkInfo.checkpointNotReadyTasks = 0;
|
pTask->chkInfo.numOfNotReady = 0;
|
||||||
pTask->chkInfo.transId = 0;
|
pTask->chkInfo.transId = 0;
|
||||||
pTask->chkInfo.dispatchCheckpointTrigger = false;
|
pTask->chkInfo.dispatchCheckpointTrigger = false;
|
||||||
|
|
||||||
|
|
|
@ -155,14 +155,14 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu
|
||||||
*blockSize = 0;
|
*blockSize = 0;
|
||||||
|
|
||||||
// no available token in bucket for sink task, let's wait for a little bit
|
// no available token in bucket for sink task, let's wait for a little bit
|
||||||
if (taskLevel == TASK_LEVEL__SINK && (!streamTaskExtractAvailableToken(pTask->outputInfo.pTokenBucket, pTask->id.idStr))) {
|
if (taskLevel == TASK_LEVEL__SINK && (!streamTaskExtractAvailableToken(pTask->outputInfo.pTokenBucket, id))) {
|
||||||
stDebug("s-task:%s no available token in bucket for sink data, wait for 10ms", id);
|
stDebug("s-task:%s no available token in bucket for sink data, wait for 10ms", id);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
if (streamTaskShouldPause(pTask) || streamTaskShouldStop(pTask)) {
|
if (streamTaskShouldPause(pTask) || streamTaskShouldStop(pTask)) {
|
||||||
stDebug("s-task:%s task should pause, extract input blocks:%d", pTask->id.idStr, *numOfBlocks);
|
stDebug("s-task:%s task should pause, extract input blocks:%d", id, *numOfBlocks);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -70,10 +70,9 @@ int32_t walNextValidMsg(SWalReader *pReader) {
|
||||||
int64_t committedVer = walGetCommittedVer(pReader->pWal);
|
int64_t committedVer = walGetCommittedVer(pReader->pWal);
|
||||||
int64_t appliedVer = walGetAppliedVer(pReader->pWal);
|
int64_t appliedVer = walGetAppliedVer(pReader->pWal);
|
||||||
|
|
||||||
wDebug("vgId:%d, wal start to fetch, index:%" PRId64 ", last index:%" PRId64 " commit index:%" PRId64
|
wDebug("vgId:%d, wal start to fetch, index:%" PRId64 ", last:%" PRId64 " commit:%" PRId64 ", applied:%" PRId64,
|
||||||
", applied index:%" PRId64,
|
|
||||||
pReader->pWal->cfg.vgId, fetchVer, lastVer, committedVer, appliedVer);
|
pReader->pWal->cfg.vgId, fetchVer, lastVer, committedVer, appliedVer);
|
||||||
if (fetchVer > appliedVer){
|
if (fetchVer > appliedVer) {
|
||||||
terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
|
terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -86,10 +85,8 @@ int32_t walNextValidMsg(SWalReader *pReader) {
|
||||||
int32_t type = pReader->pHead->head.msgType;
|
int32_t type = pReader->pHead->head.msgType;
|
||||||
if (type == TDMT_VND_SUBMIT || ((type == TDMT_VND_DELETE) && (pReader->cond.deleteMsg == 1)) ||
|
if (type == TDMT_VND_SUBMIT || ((type == TDMT_VND_DELETE) && (pReader->cond.deleteMsg == 1)) ||
|
||||||
(IS_META_MSG(type) && pReader->cond.scanMeta)) {
|
(IS_META_MSG(type) && pReader->cond.scanMeta)) {
|
||||||
if (walFetchBody(pReader) < 0) {
|
int32_t code = walFetchBody(pReader);
|
||||||
return -1;
|
return (code == TSDB_CODE_SUCCESS)? 0:-1;
|
||||||
}
|
|
||||||
return 0;
|
|
||||||
} else {
|
} else {
|
||||||
if (walSkipFetchBody(pReader) < 0) {
|
if (walSkipFetchBody(pReader) < 0) {
|
||||||
return -1;
|
return -1;
|
||||||
|
|
|
@ -498,7 +498,7 @@ static FORCE_INLINE int32_t walWriteImpl(SWal *pWal, int64_t index, tmsg_t msgTy
|
||||||
pWal->writeHead.head.version = index;
|
pWal->writeHead.head.version = index;
|
||||||
pWal->writeHead.head.bodyLen = bodyLen;
|
pWal->writeHead.head.bodyLen = bodyLen;
|
||||||
pWal->writeHead.head.msgType = msgType;
|
pWal->writeHead.head.msgType = msgType;
|
||||||
pWal->writeHead.head.ingestTs = 0;
|
pWal->writeHead.head.ingestTs = taosGetTimestampUs();
|
||||||
|
|
||||||
// sync info for sync module
|
// sync info for sync module
|
||||||
pWal->writeHead.head.syncMeta = syncMeta;
|
pWal->writeHead.head.syncMeta = syncMeta;
|
||||||
|
|
|
@ -56,6 +56,8 @@ int32_t taosGetAppName(char* name, int32_t* len) {
|
||||||
char* end = strrchr(filepath, TD_DIRSEP[0]);
|
char* end = strrchr(filepath, TD_DIRSEP[0]);
|
||||||
if (end == NULL) {
|
if (end == NULL) {
|
||||||
end = filepath;
|
end = filepath;
|
||||||
|
} else {
|
||||||
|
end += 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
tstrncpy(name, end, TSDB_APP_NAME_LEN);
|
tstrncpy(name, end, TSDB_APP_NAME_LEN);
|
||||||
|
|
|
@ -977,6 +977,7 @@ int32_t tsDecompressDoubleLossyImp(const char *input, int32_t compressedSize, co
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
#ifdef BUILD_NO_CALL
|
||||||
/*************************************************************************
|
/*************************************************************************
|
||||||
* STREAM COMPRESSION
|
* STREAM COMPRESSION
|
||||||
*************************************************************************/
|
*************************************************************************/
|
||||||
|
@ -2120,7 +2121,7 @@ int32_t tCompressEnd(SCompressor *pCmprsor, const uint8_t **ppOut, int32_t *nOut
|
||||||
int32_t tCompress(SCompressor *pCmprsor, const void *pData, int64_t nData) {
|
int32_t tCompress(SCompressor *pCmprsor, const void *pData, int64_t nData) {
|
||||||
return DATA_TYPE_INFO[pCmprsor->type].cmprFn(pCmprsor, pData, nData);
|
return DATA_TYPE_INFO[pCmprsor->type].cmprFn(pCmprsor, pData, nData);
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
/*************************************************************************
|
/*************************************************************************
|
||||||
* REGULAR COMPRESSION
|
* REGULAR COMPRESSION
|
||||||
*************************************************************************/
|
*************************************************************************/
|
||||||
|
|
|
@ -35,8 +35,8 @@
|
||||||
"start_timestamp":"now-12d",
|
"start_timestamp":"now-12d",
|
||||||
"columns": [
|
"columns": [
|
||||||
{ "type": "bool", "name": "bc"},
|
{ "type": "bool", "name": "bc"},
|
||||||
{ "type": "float", "name": "fc" },
|
{ "type": "float", "name": "fc", "min": 100, "max": 100},
|
||||||
{ "type": "double", "name": "dc"},
|
{ "type": "double", "name": "dc", "min": 200, "max": 200},
|
||||||
{ "type": "tinyint", "name": "ti"},
|
{ "type": "tinyint", "name": "ti"},
|
||||||
{ "type": "smallint", "name": "si" },
|
{ "type": "smallint", "name": "si" },
|
||||||
{ "type": "int", "name": "ic" },
|
{ "type": "int", "name": "ic" },
|
||||||
|
|
|
@ -29,7 +29,11 @@ from frame import *
|
||||||
|
|
||||||
class TDTestCase(TBase):
|
class TDTestCase(TBase):
|
||||||
updatecfgDict = {
|
updatecfgDict = {
|
||||||
"countAlwaysReturnValue" : "0"
|
"countAlwaysReturnValue" : "0",
|
||||||
|
"lossyColumns" : "float,double",
|
||||||
|
"fPrecision" : "0.000000001",
|
||||||
|
"dPrecision" : "0.00000000000000001",
|
||||||
|
"ifAdtFse" : "1"
|
||||||
}
|
}
|
||||||
|
|
||||||
def insertData(self):
|
def insertData(self):
|
||||||
|
@ -48,6 +52,18 @@ class TDTestCase(TBase):
|
||||||
sql = f"create table {self.db}.ta(ts timestamp, age int) tags(area int)"
|
sql = f"create table {self.db}.ta(ts timestamp, age int) tags(area int)"
|
||||||
tdSql.execute(sql)
|
tdSql.execute(sql)
|
||||||
|
|
||||||
|
def checkFloatDouble(self):
|
||||||
|
sql = f"select * from {self.db}.{self.stb} where fc!=100"
|
||||||
|
tdSql.query(sql)
|
||||||
|
tdSql.checkRows(0)
|
||||||
|
sql = f"select count(*) from {self.db}.{self.stb} where dc!=200"
|
||||||
|
tdSql.query(sql)
|
||||||
|
tdSql.checkRows(0)
|
||||||
|
sql = f"select avg(fc) from {self.db}.{self.stb}"
|
||||||
|
tdSql.checkFirstValue(sql, 100)
|
||||||
|
sql = f"select avg(dc) from {self.db}.{self.stb}"
|
||||||
|
tdSql.checkFirstValue(sql, 200)
|
||||||
|
|
||||||
def doAction(self):
|
def doAction(self):
|
||||||
tdLog.info(f"do action.")
|
tdLog.info(f"do action.")
|
||||||
self.flushDb()
|
self.flushDb()
|
||||||
|
@ -85,6 +101,9 @@ class TDTestCase(TBase):
|
||||||
# check insert data correct
|
# check insert data correct
|
||||||
self.checkInsertCorrect()
|
self.checkInsertCorrect()
|
||||||
|
|
||||||
|
# check float double value ok
|
||||||
|
self.checkFloatDouble()
|
||||||
|
|
||||||
# save
|
# save
|
||||||
self.snapshotAgg()
|
self.snapshotAgg()
|
||||||
|
|
||||||
|
@ -97,6 +116,10 @@ class TDTestCase(TBase):
|
||||||
# check insert correct again
|
# check insert correct again
|
||||||
self.checkInsertCorrect()
|
self.checkInsertCorrect()
|
||||||
|
|
||||||
|
# check float double value ok
|
||||||
|
self.checkFloatDouble()
|
||||||
|
|
||||||
|
|
||||||
tdLog.success(f"{__file__} successfully executed")
|
tdLog.success(f"{__file__} successfully executed")
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,66 @@
|
||||||
|
{
|
||||||
|
"filetype": "insert",
|
||||||
|
"cfgdir": "/etc/taos",
|
||||||
|
"host": "127.0.0.1",
|
||||||
|
"port": 6030,
|
||||||
|
"user": "root",
|
||||||
|
"password": "taosdata",
|
||||||
|
"connection_pool_size": 8,
|
||||||
|
"num_of_records_per_req": 3000,
|
||||||
|
"prepared_rand": 3000,
|
||||||
|
"thread_count": 2,
|
||||||
|
"create_table_thread_count": 1,
|
||||||
|
"confirm_parameter_prompt": "no",
|
||||||
|
"databases": [
|
||||||
|
{
|
||||||
|
"dbinfo": {
|
||||||
|
"name": "db",
|
||||||
|
"drop": "yes",
|
||||||
|
"vgroups": 2,
|
||||||
|
"replica": 3,
|
||||||
|
"wal_retention_period": 10,
|
||||||
|
"wal_retention_size": 100,
|
||||||
|
"keep": "60d,120d,365d",
|
||||||
|
"stt_trigger": 1,
|
||||||
|
"wal_level": 2,
|
||||||
|
"WAL_FSYNC_PERIOD": 3300,
|
||||||
|
"cachemodel": "'last_value'",
|
||||||
|
"TABLE_PREFIX":1,
|
||||||
|
"comp": 1
|
||||||
|
},
|
||||||
|
"super_tables": [
|
||||||
|
{
|
||||||
|
"name": "stb",
|
||||||
|
"child_table_exists": "no",
|
||||||
|
"childtable_count": 10,
|
||||||
|
"insert_rows": 100000,
|
||||||
|
"childtable_prefix": "d",
|
||||||
|
"insert_mode": "taosc",
|
||||||
|
"timestamp_step": 1000,
|
||||||
|
"start_timestamp":"now-360d",
|
||||||
|
"columns": [
|
||||||
|
{ "type": "bool", "name": "bc","max": 1,"min": 1},
|
||||||
|
{ "type": "float", "name": "fc" ,"max": 101,"min": 101},
|
||||||
|
{ "type": "double", "name": "dc" ,"max": 102,"min": 102},
|
||||||
|
{ "type": "tinyint", "name": "ti" ,"max": 103,"min": 103},
|
||||||
|
{ "type": "smallint", "name": "si" ,"max": 104,"min": 104},
|
||||||
|
{ "type": "int", "name": "ic" ,"max": 105,"min": 105},
|
||||||
|
{ "type": "bigint", "name": "bi" ,"max": 106,"min": 106},
|
||||||
|
{ "type": "utinyint", "name": "uti","max": 107,"min": 107},
|
||||||
|
{ "type": "usmallint", "name": "usi","max": 108,"min": 108},
|
||||||
|
{ "type": "uint", "name": "ui" ,"max": 109,"min": 109},
|
||||||
|
{ "type": "ubigint", "name": "ubi","max": 110,"min": 110},
|
||||||
|
{ "type": "binary", "name": "bin", "len": 16},
|
||||||
|
{ "type": "nchar", "name": "nch", "len": 32}
|
||||||
|
],
|
||||||
|
"tags": [
|
||||||
|
{"type": "tinyint", "name": "groupid","max": 100,"min": 100},
|
||||||
|
{"name": "location","type": "binary", "len": 16, "values":
|
||||||
|
["San Francisco", "Los Angles", "San Diego", "San Jose", "Palo Alto", "Campbell", "Mountain View","Sunnyvale", "Santa Clara", "Cupertino"]
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
|
@ -0,0 +1,140 @@
|
||||||
|
###################################################################
|
||||||
|
# Copyright (c) 2016 by TAOS Technologies, Inc.
|
||||||
|
# All rights reserved.
|
||||||
|
#
|
||||||
|
# This file is proprietary and confidential to TAOS Technologies.
|
||||||
|
# No part of this file may be reproduced, stored, transmitted,
|
||||||
|
# disclosed or used in any form or by any means other than as
|
||||||
|
# expressly provided by the written permission from Jianhui Tao
|
||||||
|
#
|
||||||
|
###################################################################
|
||||||
|
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
|
import sys
|
||||||
|
import time
|
||||||
|
import random
|
||||||
|
|
||||||
|
import taos
|
||||||
|
import frame
|
||||||
|
import frame.etool
|
||||||
|
|
||||||
|
|
||||||
|
from frame.log import *
|
||||||
|
from frame.cases import *
|
||||||
|
from frame.sql import *
|
||||||
|
from frame.caseBase import *
|
||||||
|
from frame import *
|
||||||
|
|
||||||
|
|
||||||
|
class TDTestCase(TBase):
|
||||||
|
updatecfgDict = {
|
||||||
|
"compressMsgSize" : "100",
|
||||||
|
}
|
||||||
|
|
||||||
|
def insertData(self):
|
||||||
|
tdLog.info(f"insert data.")
|
||||||
|
# taosBenchmark run
|
||||||
|
jfile = etool.curFile(__file__, "oneStageComp.json")
|
||||||
|
etool.benchMark(json=jfile)
|
||||||
|
|
||||||
|
tdSql.execute(f"use {self.db}")
|
||||||
|
# set insert data information
|
||||||
|
self.childtable_count = 10
|
||||||
|
self.insert_rows = 100000
|
||||||
|
self.timestamp_step = 1000
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
def checkColValueCorrect(self):
|
||||||
|
tdLog.info(f"do action.")
|
||||||
|
self.flushDb()
|
||||||
|
|
||||||
|
# check all columns correct
|
||||||
|
cnt = self.insert_rows * self.childtable_count
|
||||||
|
sql = "select * from stb where bc!=1"
|
||||||
|
tdSql.query(sql)
|
||||||
|
tdSql.checkRows(0)
|
||||||
|
sql = "select * from stb where fc=101"
|
||||||
|
tdSql.query(sql)
|
||||||
|
tdSql.checkRows(cnt)
|
||||||
|
sql = "select * from stb where dc!=102"
|
||||||
|
tdSql.query(sql)
|
||||||
|
tdSql.checkRows(0)
|
||||||
|
sql = "select * from stb where ti!=103"
|
||||||
|
tdSql.query(sql)
|
||||||
|
tdSql.checkRows(0)
|
||||||
|
sql = "select * from stb where si!=104"
|
||||||
|
tdSql.query(sql)
|
||||||
|
tdSql.checkRows(0)
|
||||||
|
sql = "select * from stb where ic!=105"
|
||||||
|
tdSql.query(sql)
|
||||||
|
tdSql.checkRows(0)
|
||||||
|
sql = "select * from stb where bi!=106"
|
||||||
|
tdSql.query(sql)
|
||||||
|
tdSql.checkRows(0)
|
||||||
|
sql = "select * from stb where uti!=107"
|
||||||
|
tdSql.query(sql)
|
||||||
|
tdSql.checkRows(0)
|
||||||
|
sql = "select * from stb where usi!=108"
|
||||||
|
tdSql.query(sql)
|
||||||
|
tdSql.checkRows(0)
|
||||||
|
sql = "select * from stb where ui!=109"
|
||||||
|
tdSql.query(sql)
|
||||||
|
tdSql.checkRows(0)
|
||||||
|
sql = "select * from stb where ubi!=110"
|
||||||
|
tdSql.query(sql)
|
||||||
|
tdSql.checkRows(0)
|
||||||
|
|
||||||
|
def insertNull(self):
|
||||||
|
# insert 6 lines
|
||||||
|
sql = "insert into d0(ts) values(now) (now + 1s) (now + 2s) (now + 3s) (now + 4s) (now + 5s)"
|
||||||
|
tdSql.execute(sql)
|
||||||
|
|
||||||
|
self.flushDb()
|
||||||
|
self.trimDb()
|
||||||
|
|
||||||
|
# check all columns correct
|
||||||
|
cnt = self.insert_rows * self.childtable_count
|
||||||
|
sql = "select * from stb where bc!=1"
|
||||||
|
tdSql.query(sql)
|
||||||
|
tdSql.checkRows(0)
|
||||||
|
sql = "select * from stb where bc is null"
|
||||||
|
tdSql.query(sql)
|
||||||
|
tdSql.checkRows(6)
|
||||||
|
sql = "select * from stb where bc=1"
|
||||||
|
tdSql.query(sql)
|
||||||
|
tdSql.checkRows(cnt)
|
||||||
|
sql = "select * from stb where usi is null"
|
||||||
|
tdSql.query(sql)
|
||||||
|
tdSql.checkRows(6)
|
||||||
|
|
||||||
|
# run
|
||||||
|
def run(self):
|
||||||
|
tdLog.debug(f"start to excute {__file__}")
|
||||||
|
|
||||||
|
# insert data
|
||||||
|
self.insertData()
|
||||||
|
|
||||||
|
# check insert data correct
|
||||||
|
self.checkInsertCorrect()
|
||||||
|
|
||||||
|
# save
|
||||||
|
self.snapshotAgg()
|
||||||
|
|
||||||
|
# do action
|
||||||
|
self.checkColValueCorrect()
|
||||||
|
|
||||||
|
# check save agg result correct
|
||||||
|
self.checkAggCorrect()
|
||||||
|
|
||||||
|
# insert null
|
||||||
|
self.insertNull()
|
||||||
|
|
||||||
|
|
||||||
|
tdLog.success(f"{__file__} successfully executed")
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
tdCases.addLinux(__file__, TDTestCase())
|
||||||
|
tdCases.addWindows(__file__, TDTestCase())
|
|
@ -23,7 +23,7 @@ fi
|
||||||
,,y,army,./pytest.sh python3 ./test.py -f community/query/query_basic.py -N 3
|
,,y,army,./pytest.sh python3 ./test.py -f community/query/query_basic.py -N 3
|
||||||
,,y,army,./pytest.sh python3 ./test.py -f community/cluster/splitVgroupByLearner.py -N 3
|
,,y,army,./pytest.sh python3 ./test.py -f community/cluster/splitVgroupByLearner.py -N 3
|
||||||
,,n,army,python3 ./test.py -f community/cmdline/fullopt.py
|
,,n,army,python3 ./test.py -f community/cmdline/fullopt.py
|
||||||
|
,,y,army,./pytest.sh python3 ./test.py -f community/storage/oneStageComp.py -N 3 -L 3 -D 1
|
||||||
|
|
||||||
|
|
||||||
#
|
#
|
||||||
|
|
|
@ -385,9 +385,11 @@ unsigned int optimize_intervals_double_1D_opt(double *oriData, size_t dataLength
|
||||||
totalSampleSize++;
|
totalSampleSize++;
|
||||||
pred_value = data_pos[-1];
|
pred_value = data_pos[-1];
|
||||||
pred_err = fabs(pred_value - *data_pos);
|
pred_err = fabs(pred_value - *data_pos);
|
||||||
radiusIndex = (unsigned long)((pred_err/realPrecision+1)/2);
|
double dbri = (unsigned long)((pred_err/realPrecision+1)/2);
|
||||||
if(radiusIndex>=confparams_cpr->maxRangeRadius)
|
if(dbri >= (double)confparams_cpr->maxRangeRadius)
|
||||||
radiusIndex = confparams_cpr->maxRangeRadius - 1;
|
radiusIndex = confparams_cpr->maxRangeRadius - 1;
|
||||||
|
else
|
||||||
|
radiusIndex = dbri;
|
||||||
intervals[radiusIndex]++;
|
intervals[radiusIndex]++;
|
||||||
|
|
||||||
data_pos += confparams_cpr->sampleDistance;
|
data_pos += confparams_cpr->sampleDistance;
|
||||||
|
|
Loading…
Reference in New Issue