Merge pull request #26743 from taosdata/fix/syntax
fix(stream): check return value, and do some internal refactor.
This commit is contained in:
commit
a09ad7a81e
|
@ -195,7 +195,7 @@ typedef struct SStoreCacheReader {
|
|||
int32_t (*openReader)(void *pVnode, int32_t type, void *pTableIdList, int32_t numOfTables, int32_t numOfCols,
|
||||
SArray *pCidList, int32_t *pSlotIds, uint64_t suid, void **pReader, const char *idstr,
|
||||
SArray *pFuncTypeList, SColumnInfo* pPkCol, int32_t numOfPks);
|
||||
void *(*closeReader)(void *pReader);
|
||||
void (*closeReader)(void *pReader);
|
||||
int32_t (*retrieveRows)(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds, const int32_t *dstSlotIds,
|
||||
SArray *pTableUidList);
|
||||
int32_t (*reuseReader)(void *pReader, void *pTableIdList, int32_t numOfTables);
|
||||
|
|
|
@ -756,7 +756,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
|
|||
if (cfgAddInt64(pCfg, "streamBufferSize", tsStreamBufferSize, 0, INT64_MAX, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
||||
if (cfgAddInt64(pCfg, "streamAggCnt", tsStreamAggCnt, 2, INT32_MAX, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
||||
|
||||
if (cfgAddInt32(pCfg, "checkpointInterval", tsStreamCheckpointInterval, 60, 1200, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1;
|
||||
if (cfgAddInt32(pCfg, "checkpointInterval", tsStreamCheckpointInterval, 60, 1800, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1;
|
||||
if (cfgAddFloat(pCfg, "streamSinkDataRate", tsSinkDataRate, 0.1, 5, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
||||
if (cfgAddInt32(pCfg, "concurrentCheckpoint", tsMaxConcurrentCheckpoint, 1, 10, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1;
|
||||
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -57,7 +57,10 @@ void addIntoCheckpointList(SArray *pList, const SFailedCheckpointInfo *pInfo) {
|
|||
}
|
||||
}
|
||||
|
||||
taosArrayPush(pList, pInfo);
|
||||
void* p = taosArrayPush(pList, pInfo);
|
||||
if (p == NULL) {
|
||||
mError("failed to push failed checkpoint info checkpointId:%" PRId64 " in list", pInfo->checkpointId);
|
||||
}
|
||||
}
|
||||
|
||||
int32_t mndCreateStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) {
|
||||
|
@ -208,6 +211,8 @@ int32_t suspendAllStreams(SMnode *pMnode, SRpcHandleInfo *info) {
|
|||
SSdb *pSdb = pMnode->pSdb;
|
||||
SStreamObj *pStream = NULL;
|
||||
void *pIter = NULL;
|
||||
int32_t code = 0;
|
||||
|
||||
while (1) {
|
||||
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
|
||||
if (pIter == NULL) break;
|
||||
|
@ -219,7 +224,17 @@ int32_t suspendAllStreams(SMnode *pMnode, SRpcHandleInfo *info) {
|
|||
|
||||
int32_t contLen = tSerializeSMPauseStreamReq(NULL, 0, &reqPause);
|
||||
void *pHead = rpcMallocCont(contLen);
|
||||
tSerializeSMPauseStreamReq(pHead, contLen, &reqPause);
|
||||
if (pHead == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
sdbRelease(pSdb, pStream);
|
||||
continue;
|
||||
}
|
||||
|
||||
code = tSerializeSMPauseStreamReq(pHead, contLen, &reqPause);
|
||||
if (code) {
|
||||
sdbRelease(pSdb, pStream);
|
||||
continue;
|
||||
}
|
||||
|
||||
SRpcMsg rpcMsg = {
|
||||
.msgType = TDMT_MND_PAUSE_STREAM,
|
||||
|
@ -228,14 +243,14 @@ int32_t suspendAllStreams(SMnode *pMnode, SRpcHandleInfo *info) {
|
|||
.info = *info,
|
||||
};
|
||||
|
||||
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
|
||||
mInfo("receive pause stream:%s, %s, %" PRId64 ", because grant expired", pStream->name, reqPause.name,
|
||||
pStream->uid);
|
||||
code = tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
|
||||
mInfo("receive pause stream:%s, %s, %" PRId64 ", because grant expired, code:%s", pStream->name, reqPause.name,
|
||||
pStream->uid, tstrerror(code));
|
||||
}
|
||||
|
||||
sdbRelease(pSdb, pStream);
|
||||
}
|
||||
return 0;
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t mndProcessStreamHb(SRpcMsg *pReq) {
|
||||
|
@ -267,7 +282,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
|
|||
pFailedChkpt = taosArrayInit(4, sizeof(SFailedCheckpointInfo));
|
||||
pOrphanTasks = taosArrayInit(4, sizeof(SOrphanTask));
|
||||
|
||||
taosThreadMutexLock(&execInfo.lock);
|
||||
streamMutexLock(&execInfo.lock);
|
||||
|
||||
mndInitStreamExecInfo(pMnode, &execInfo);
|
||||
if (!validateHbMsg(execInfo.pNodeList, req.vgId)) {
|
||||
|
@ -276,7 +291,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
|
|||
code = terrno = TSDB_CODE_INVALID_MSG;
|
||||
doSendHbMsgRsp(terrno, &pReq->info, req.vgId, req.msgId);
|
||||
|
||||
taosThreadMutexUnlock(&execInfo.lock);
|
||||
streamMutexUnlock(&execInfo.lock);
|
||||
cleanupAfterProcessHbMsg(&req, pFailedChkpt, pOrphanTasks);
|
||||
return code;
|
||||
}
|
||||
|
@ -284,7 +299,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
|
|||
int32_t numOfUpdated = taosArrayGetSize(req.pUpdateNodes);
|
||||
if (numOfUpdated > 0) {
|
||||
mDebug("%d stream node(s) need updated from hbMsg(vgId:%d)", numOfUpdated, req.vgId);
|
||||
setNodeEpsetExpiredFlag(req.pUpdateNodes);
|
||||
(void) setNodeEpsetExpiredFlag(req.pUpdateNodes);
|
||||
}
|
||||
|
||||
bool snodeChanged = false;
|
||||
|
@ -296,7 +311,10 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
|
|||
mError("s-task:0x%" PRIx64 " not found in mnode task list", p->id.taskId);
|
||||
|
||||
SOrphanTask oTask = {.streamId = p->id.streamId, .taskId = p->id.taskId, .nodeId = p->nodeId};
|
||||
taosArrayPush(pOrphanTasks, &oTask);
|
||||
void* px = taosArrayPush(pOrphanTasks, &oTask);
|
||||
if (px == NULL) {
|
||||
mError("Failed to put task into list, taskId:0x%" PRIx64, p->id.taskId);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -346,7 +364,10 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
|
|||
addIntoCheckpointList(pFailedChkpt, &info);
|
||||
|
||||
// remove failed trans from pChkptStreams
|
||||
taosHashRemove(execInfo.pChkptStreams, &p->id.streamId, sizeof(p->id.streamId));
|
||||
code = taosHashRemove(execInfo.pChkptStreams, &p->id.streamId, sizeof(p->id.streamId));
|
||||
if (code) {
|
||||
mError("failed to remove stream:0x%"PRIx64" in checkpoint stream list", p->id.streamId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -386,7 +407,10 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
|
|||
mInfo("checkpointId:%" PRId64 " transId:%d failed, issue task-reset trans to reset all tasks status",
|
||||
pInfo->checkpointId, pInfo->transId);
|
||||
|
||||
mndResetStatusFromCheckpoint(pMnode, pInfo->streamUid, pInfo->transId);
|
||||
code = mndResetStatusFromCheckpoint(pMnode, pInfo->streamUid, pInfo->transId);
|
||||
if (code) {
|
||||
mError("failed to create reset task trans, code:%s", tstrerror(code));
|
||||
}
|
||||
}
|
||||
} else {
|
||||
mInfo("not all vgroups are ready, wait for next HB from stream tasks to reset the task status");
|
||||
|
@ -395,20 +419,19 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
|
|||
|
||||
// handle the orphan tasks that are invalid but not removed in some vnodes or snode due to some unknown errors.
|
||||
if (taosArrayGetSize(pOrphanTasks) > 0) {
|
||||
mndDropOrphanTasks(pMnode, pOrphanTasks);
|
||||
code = mndDropOrphanTasks(pMnode, pOrphanTasks);
|
||||
}
|
||||
|
||||
if (pMnode != NULL) { // make sure that the unit test case can work
|
||||
mndStreamStartUpdateCheckpointInfo(pMnode);
|
||||
}
|
||||
|
||||
taosThreadMutexUnlock(&execInfo.lock);
|
||||
streamMutexUnlock(&execInfo.lock);
|
||||
|
||||
terrno = TSDB_CODE_SUCCESS;
|
||||
doSendHbMsgRsp(terrno, &pReq->info, req.vgId, req.msgId);
|
||||
doSendHbMsgRsp(TSDB_CODE_SUCCESS, &pReq->info, req.vgId, req.msgId);
|
||||
|
||||
cleanupAfterProcessHbMsg(&req, pFailedChkpt, pOrphanTasks);
|
||||
return terrno;
|
||||
return code;
|
||||
}
|
||||
|
||||
void mndStreamStartUpdateCheckpointInfo(SMnode *pMnode) { // here reuse the doCheckpointmsg
|
||||
|
@ -416,7 +439,10 @@ void mndStreamStartUpdateCheckpointInfo(SMnode *pMnode) { // here reuse the doC
|
|||
if (pMsg != NULL) {
|
||||
int32_t size = sizeof(SMStreamDoCheckpointMsg);
|
||||
SRpcMsg rpcMsg = {.msgType = TDMT_MND_STREAM_UPDATE_CHKPT_EVT, .pCont = pMsg, .contLen = size};
|
||||
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
|
||||
int32_t code = tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
|
||||
if (code) {
|
||||
mError("failed to put into write Queue, code:%s", tstrerror(code));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -24,8 +24,7 @@ typedef struct SKeyInfo {
|
|||
int32_t mndStreamRegisterTrans(STrans *pTrans, const char *pTransName, int64_t streamId) {
|
||||
SStreamTransInfo info = {
|
||||
.transId = pTrans->id, .startTime = taosGetTimestampMs(), .name = pTransName, .streamId = streamId};
|
||||
taosHashPut(execInfo.transMgmt.pDBTrans, &streamId, sizeof(streamId), &info, sizeof(SStreamTransInfo));
|
||||
return 0;
|
||||
return taosHashPut(execInfo.transMgmt.pDBTrans, &streamId, sizeof(streamId), &info, sizeof(SStreamTransInfo));
|
||||
}
|
||||
|
||||
int32_t mndStreamClearFinishedTrans(SMnode *pMnode, int32_t *pNumOfActiveChkpt) {
|
||||
|
@ -45,7 +44,10 @@ int32_t mndStreamClearFinishedTrans(SMnode *pMnode, int32_t *pNumOfActiveChkpt)
|
|||
SKeyInfo info = {.pKey = pKey, .keyLen = keyLen};
|
||||
mDebug("transId:%d %s startTs:%" PRId64 " cleared since finished", pEntry->transId, pEntry->name,
|
||||
pEntry->startTime);
|
||||
taosArrayPush(pList, &info);
|
||||
void* p = taosArrayPush(pList, &info);
|
||||
if (p == NULL) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
} else {
|
||||
if (strcmp(pEntry->name, MND_STREAM_CHECKPOINT_NAME) == 0) {
|
||||
num++;
|
||||
|
@ -57,7 +59,11 @@ int32_t mndStreamClearFinishedTrans(SMnode *pMnode, int32_t *pNumOfActiveChkpt)
|
|||
int32_t size = taosArrayGetSize(pList);
|
||||
for (int32_t i = 0; i < size; ++i) {
|
||||
SKeyInfo *pKey = taosArrayGet(pList, i);
|
||||
taosHashRemove(execInfo.transMgmt.pDBTrans, pKey->pKey, pKey->keyLen);
|
||||
int32_t code = taosHashRemove(execInfo.transMgmt.pDBTrans, pKey->pKey, pKey->keyLen);
|
||||
if (code != 0) {
|
||||
taosArrayDestroy(pList);
|
||||
return code;
|
||||
}
|
||||
}
|
||||
|
||||
mDebug("clear %d finished stream-trans, remained:%d, active checkpoint trans:%d", size,
|
||||
|
@ -79,25 +85,28 @@ int32_t mndStreamClearFinishedTrans(SMnode *pMnode, int32_t *pNumOfActiveChkpt)
|
|||
// 2. create/drop/reset/update trans are conflict with any other trans.
|
||||
bool mndStreamTransConflictCheck(SMnode *pMnode, int64_t streamId, const char *pTransName, bool lock) {
|
||||
if (lock) {
|
||||
taosThreadMutexLock(&execInfo.lock);
|
||||
streamMutexLock(&execInfo.lock);
|
||||
}
|
||||
|
||||
int32_t num = taosHashGetSize(execInfo.transMgmt.pDBTrans);
|
||||
if (num <= 0) {
|
||||
if (lock) {
|
||||
taosThreadMutexUnlock(&execInfo.lock);
|
||||
streamMutexUnlock(&execInfo.lock);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
mndStreamClearFinishedTrans(pMnode, NULL);
|
||||
int32_t code = mndStreamClearFinishedTrans(pMnode, NULL);
|
||||
if (code) {
|
||||
mError("failed to clear finish trans, code:%s", tstrerror(code));
|
||||
}
|
||||
|
||||
SStreamTransInfo *pEntry = taosHashGet(execInfo.transMgmt.pDBTrans, &streamId, sizeof(streamId));
|
||||
if (pEntry != NULL) {
|
||||
SStreamTransInfo tInfo = *pEntry;
|
||||
|
||||
if (lock) {
|
||||
taosThreadMutexUnlock(&execInfo.lock);
|
||||
streamMutexUnlock(&execInfo.lock);
|
||||
}
|
||||
|
||||
if (strcmp(tInfo.name, MND_STREAM_CHECKPOINT_NAME) == 0) {
|
||||
|
@ -122,32 +131,36 @@ bool mndStreamTransConflictCheck(SMnode *pMnode, int64_t streamId, const char *p
|
|||
}
|
||||
|
||||
if (lock) {
|
||||
taosThreadMutexUnlock(&execInfo.lock);
|
||||
streamMutexUnlock(&execInfo.lock);
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
int32_t mndStreamGetRelTrans(SMnode *pMnode, int64_t streamId) {
|
||||
taosThreadMutexLock(&execInfo.lock);
|
||||
streamMutexLock(&execInfo.lock);
|
||||
int32_t num = taosHashGetSize(execInfo.transMgmt.pDBTrans);
|
||||
if (num <= 0) {
|
||||
taosThreadMutexUnlock(&execInfo.lock);
|
||||
streamMutexUnlock(&execInfo.lock);
|
||||
return 0;
|
||||
}
|
||||
|
||||
mndStreamClearFinishedTrans(pMnode, NULL);
|
||||
int32_t code = mndStreamClearFinishedTrans(pMnode, NULL);
|
||||
if (code) {
|
||||
mError("failed to clear finish trans, code:%s", tstrerror(code));
|
||||
}
|
||||
|
||||
SStreamTransInfo *pEntry = taosHashGet(execInfo.transMgmt.pDBTrans, &streamId, sizeof(streamId));
|
||||
if (pEntry != NULL) {
|
||||
SStreamTransInfo tInfo = *pEntry;
|
||||
taosThreadMutexUnlock(&execInfo.lock);
|
||||
streamMutexUnlock(&execInfo.lock);
|
||||
|
||||
if (strcmp(tInfo.name, MND_STREAM_CHECKPOINT_NAME) == 0 || strcmp(tInfo.name, MND_STREAM_TASK_UPDATE_NAME) == 0 ||
|
||||
strcmp(tInfo.name, MND_STREAM_CHKPT_UPDATE_NAME) == 0) {
|
||||
return tInfo.transId;
|
||||
}
|
||||
} else {
|
||||
taosThreadMutexUnlock(&execInfo.lock);
|
||||
streamMutexUnlock(&execInfo.lock);
|
||||
}
|
||||
|
||||
return 0;
|
||||
|
@ -231,21 +244,21 @@ int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans, int32_t status)
|
|||
if (pCommitRaw == NULL) {
|
||||
mError("failed to encode stream since %s", terrstr());
|
||||
mndTransDrop(pTrans);
|
||||
return -1;
|
||||
return terrno;
|
||||
}
|
||||
|
||||
if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
|
||||
mError("stream trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
|
||||
sdbFreeRaw(pCommitRaw);
|
||||
mndTransDrop(pTrans);
|
||||
return -1;
|
||||
return terrno;
|
||||
}
|
||||
|
||||
if (sdbSetRawStatus(pCommitRaw, status) != 0) {
|
||||
mError("stream trans:%d failed to set raw status:%d since %s", pTrans->id, status, terrstr());
|
||||
sdbFreeRaw(pCommitRaw);
|
||||
mndTransDrop(pTrans);
|
||||
return -1;
|
||||
return terrno;
|
||||
}
|
||||
|
||||
return 0;
|
||||
|
@ -303,8 +316,12 @@ void killAllCheckpointTrans(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo) {
|
|||
void *pKey = taosHashGetKey(pDb, &len);
|
||||
char *p = strndup(pKey, len);
|
||||
|
||||
mDebug("clear checkpoint trans in Db:%s", p);
|
||||
doKillCheckpointTrans(pMnode, pKey, len);
|
||||
int32_t code = doKillCheckpointTrans(pMnode, pKey, len);
|
||||
if (code) {
|
||||
mError("failed to kill trans, transId:%p", pKey)
|
||||
} else {
|
||||
mDebug("clear checkpoint trans in Db:%s", p);
|
||||
}
|
||||
taosMemoryFree(p);
|
||||
}
|
||||
|
||||
|
|
|
@ -744,8 +744,9 @@ int32_t mndInitExecInfo() {
|
|||
void removeExpiredNodeInfo(const SArray *pNodeSnapshot) {
|
||||
SArray *pValidList = taosArrayInit(4, sizeof(SNodeEntry));
|
||||
int32_t size = taosArrayGetSize(pNodeSnapshot);
|
||||
int32_t oldSize = taosArrayGetSize(execInfo.pNodeList);
|
||||
|
||||
for (int32_t i = 0; i < taosArrayGetSize(execInfo.pNodeList); ++i) {
|
||||
for (int32_t i = 0; i < oldSize; ++i) {
|
||||
SNodeEntry *p = taosArrayGet(execInfo.pNodeList, i);
|
||||
|
||||
for (int32_t j = 0; j < size; ++j) {
|
||||
|
@ -763,7 +764,8 @@ void removeExpiredNodeInfo(const SArray *pNodeSnapshot) {
|
|||
taosArrayDestroy(execInfo.pNodeList);
|
||||
execInfo.pNodeList = pValidList;
|
||||
|
||||
mDebug("remain %d valid node entries after clean expired nodes info", (int32_t)taosArrayGetSize(pValidList));
|
||||
mDebug("remain %d valid node entries after clean expired nodes info, prev size:%d",
|
||||
(int32_t)taosArrayGetSize(pValidList), oldSize);
|
||||
}
|
||||
|
||||
int32_t doRemoveTasks(SStreamExecInfo *pExecNode, STaskId *pRemovedId) {
|
||||
|
@ -1203,13 +1205,17 @@ void mndClearConsensusRspEntry(SCheckpointConsensusInfo* pInfo) {
|
|||
}
|
||||
|
||||
int64_t mndClearConsensusCheckpointId(SHashObj* pHash, int64_t streamId) {
|
||||
int32_t code = taosHashRemove(pHash, &streamId, sizeof(streamId));
|
||||
int32_t code = 0;
|
||||
int32_t numOfStreams = taosHashGetSize(pHash);
|
||||
if (numOfStreams == 0) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
code = taosHashRemove(pHash, &streamId, sizeof(streamId));
|
||||
if (code == 0) {
|
||||
int32_t numOfStreams = taosHashGetSize(pHash);
|
||||
mDebug("drop stream:0x%" PRIx64 " in consensus-checkpointId list after new checkpoint generated, remain:%d",
|
||||
streamId, numOfStreams);
|
||||
mDebug("drop stream:0x%" PRIx64 " in consensus-checkpointId list, remain:%d", streamId, numOfStreams);
|
||||
} else {
|
||||
mError("failed to remove stream:0x%"PRIx64" in consensus-checkpointId list", streamId);
|
||||
mError("failed to remove stream:0x%"PRIx64" in consensus-checkpointId list, remain:%d", streamId, numOfStreams);
|
||||
}
|
||||
|
||||
return code;
|
||||
|
|
|
@ -185,7 +185,7 @@ int32_t tsdbCacherowsReaderOpen(void *pVnode, int32_t type, void *pTableIdList,
|
|||
SArray *pFuncTypeList, SColumnInfo *pkCol, int32_t numOfPks);
|
||||
int32_t tsdbRetrieveCacheRows(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds, const int32_t *dstSlotIds,
|
||||
SArray *pTableUids);
|
||||
void *tsdbCacherowsReaderClose(void *pReader);
|
||||
void tsdbCacherowsReaderClose(void *pReader);
|
||||
|
||||
void tsdbCacheSetCapacity(SVnode *pVnode, size_t capacity);
|
||||
size_t tsdbCacheGetCapacity(SVnode *pVnode);
|
||||
|
|
|
@ -1442,7 +1442,7 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) {
|
|||
SMA_VID(pSma), pItem->level, pRSmaInfo->suid);
|
||||
} break;
|
||||
case TASK_TRIGGER_STAT_INIT: {
|
||||
smaDebug("vgId:%d, rsma fetch task not start for level:%" PRIi8 " suid::%" PRIi64 " since stat is init",
|
||||
smaDebug("vgId:%d, rsma fetch task not start for level:%" PRIi8 " suid:%" PRIi64 " since stat is init",
|
||||
SMA_VID(pSma), pItem->level, pRSmaInfo->suid);
|
||||
} break;
|
||||
default: {
|
||||
|
|
|
@ -23,20 +23,27 @@
|
|||
|
||||
#define HASTYPE(_type, _t) (((_type) & (_t)) == (_t))
|
||||
|
||||
static void setFirstLastResColToNull(SColumnInfoData* pCol, int32_t row) {
|
||||
char* buf = taosMemoryCalloc(1, pCol->info.bytes);
|
||||
static int32_t setFirstLastResColToNull(SColumnInfoData* pCol, int32_t row) {
|
||||
char* buf = taosMemoryCalloc(1, pCol->info.bytes);
|
||||
if (buf == NULL) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
SFirstLastRes* pRes = (SFirstLastRes*)((char*)buf + VARSTR_HEADER_SIZE);
|
||||
pRes->bytes = 0;
|
||||
pRes->hasResult = true;
|
||||
pRes->isNull = true;
|
||||
varDataSetLen(buf, pCol->info.bytes - VARSTR_HEADER_SIZE);
|
||||
colDataSetVal(pCol, row, buf, false);
|
||||
int32_t code = colDataSetVal(pCol, row, buf, false);
|
||||
taosMemoryFree(buf);
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static void saveOneRowForLastRaw(SLastCol* pColVal, SCacheRowsReader* pReader, const int32_t slotId,
|
||||
static int32_t saveOneRowForLastRaw(SLastCol* pColVal, SCacheRowsReader* pReader, const int32_t slotId,
|
||||
SColumnInfoData* pColInfoData, int32_t numOfRows) {
|
||||
SColVal* pVal = &pColVal->colVal;
|
||||
int32_t code = 0;
|
||||
|
||||
// allNullRow = false;
|
||||
if (IS_VAR_DATA_TYPE(pColVal->colVal.value.type)) {
|
||||
|
@ -46,17 +53,19 @@ static void saveOneRowForLastRaw(SLastCol* pColVal, SCacheRowsReader* pReader, c
|
|||
varDataSetLen(pReader->transferBuf[slotId], pVal->value.nData);
|
||||
|
||||
memcpy(varDataVal(pReader->transferBuf[slotId]), pVal->value.pData, pVal->value.nData);
|
||||
colDataSetVal(pColInfoData, numOfRows, pReader->transferBuf[slotId], false);
|
||||
code = colDataSetVal(pColInfoData, numOfRows, pReader->transferBuf[slotId], false);
|
||||
}
|
||||
} else {
|
||||
colDataSetVal(pColInfoData, numOfRows, (const char*)&pVal->value.val, !COL_VAL_IS_VALUE(pVal));
|
||||
code = colDataSetVal(pColInfoData, numOfRows, (const char*)&pVal->value.val, !COL_VAL_IS_VALUE(pVal));
|
||||
}
|
||||
return;
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* pReader, const int32_t* slotIds,
|
||||
const int32_t* dstSlotIds, void** pRes, const char* idStr) {
|
||||
int32_t numOfRows = pBlock->info.rows;
|
||||
int32_t code = 0;
|
||||
|
||||
if (HASTYPE(pReader->type, CACHESCAN_RETRIEVE_LAST)) {
|
||||
uint64_t ts = TSKEY_MIN;
|
||||
|
@ -64,14 +73,33 @@ static int32_t saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* p
|
|||
col_id_t colId = -1;
|
||||
|
||||
SArray* funcTypeBlockArray = taosArrayInit(pReader->numOfCols, sizeof(int32_t));
|
||||
if (funcTypeBlockArray == NULL) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < pReader->numOfCols; ++i) {
|
||||
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotIds[i]);
|
||||
int32_t funcType = FUNCTION_TYPE_CACHE_LAST;
|
||||
if (pColInfoData == NULL) {
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
|
||||
int32_t funcType = FUNCTION_TYPE_CACHE_LAST;
|
||||
if (pReader->pFuncTypeList != NULL && taosArrayGetSize(pReader->pFuncTypeList) > i) {
|
||||
funcType = *(int32_t*)taosArrayGet(pReader->pFuncTypeList, i);
|
||||
taosArrayInsert(funcTypeBlockArray, dstSlotIds[i], taosArrayGet(pReader->pFuncTypeList, i));
|
||||
void* pVal = taosArrayGet(pReader->pFuncTypeList, i);
|
||||
if (pVal == NULL) {
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
|
||||
funcType = *(int32_t*) pVal;
|
||||
pVal = taosArrayGet(pReader->pFuncTypeList, i);
|
||||
if (pVal == NULL) {
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
|
||||
void* px = taosArrayInsert(funcTypeBlockArray, dstSlotIds[i], pVal);
|
||||
if (px == NULL) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
}
|
||||
|
||||
if (slotIds[i] == -1) {
|
||||
|
@ -79,15 +107,27 @@ static int32_t saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* p
|
|||
colDataSetNULL(pColInfoData, numOfRows);
|
||||
continue;
|
||||
}
|
||||
setFirstLastResColToNull(pColInfoData, numOfRows);
|
||||
|
||||
code = setFirstLastResColToNull(pColInfoData, numOfRows);
|
||||
if (code) {
|
||||
return code;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
int32_t slotId = slotIds[i];
|
||||
SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, i);
|
||||
colId = pColVal->colVal.cid;
|
||||
if (pColVal == NULL) {
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
|
||||
colId = pColVal->colVal.cid;
|
||||
if (FUNCTION_TYPE_CACHE_LAST_ROW == funcType) {
|
||||
saveOneRowForLastRaw(pColVal, pReader, slotId, pColInfoData, numOfRows);
|
||||
code = saveOneRowForLastRaw(pColVal, pReader, slotId, pColInfoData, numOfRows);
|
||||
if (code) {
|
||||
return code;
|
||||
}
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -112,13 +152,25 @@ static int32_t saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* p
|
|||
// pColInfoData->info.bytes includes the VARSTR_HEADER_SIZE, need to subtract it
|
||||
p->hasResult = true;
|
||||
varDataSetLen(pRes[i], pColInfoData->info.bytes - VARSTR_HEADER_SIZE);
|
||||
colDataSetVal(pColInfoData, numOfRows, (const char*)pRes[i], false);
|
||||
code = colDataSetVal(pColInfoData, numOfRows, (const char*)pRes[i], false);
|
||||
if (code) {
|
||||
return code;
|
||||
}
|
||||
}
|
||||
|
||||
for (int32_t idx = 0; idx < taosArrayGetSize(pBlock->pDataBlock); ++idx) {
|
||||
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, idx);
|
||||
if (pCol == NULL) {
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
|
||||
if (idx < funcTypeBlockArray->size) {
|
||||
int32_t funcType = *(int32_t*)taosArrayGet(funcTypeBlockArray, idx);
|
||||
void* pVal = taosArrayGet(funcTypeBlockArray, idx);
|
||||
if (pVal == NULL) {
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
|
||||
int32_t funcType = *(int32_t*)pVal;
|
||||
if (FUNCTION_TYPE_CACHE_LAST_ROW == funcType) {
|
||||
continue;
|
||||
}
|
||||
|
@ -128,12 +180,18 @@ static int32_t saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* p
|
|||
if (ts == TSKEY_MIN) {
|
||||
colDataSetNULL(pCol, numOfRows);
|
||||
} else {
|
||||
colDataSetVal(pCol, numOfRows, (const char*)&ts, false);
|
||||
code = colDataSetVal(pCol, numOfRows, (const char*)&ts, false);
|
||||
if (code) {
|
||||
return code;
|
||||
}
|
||||
}
|
||||
continue;
|
||||
} else if (pReader->numOfCols == 1 && idx != dstSlotIds[0] && (pCol->info.colId == colId || colId == -1)) {
|
||||
if (p && !p->isNull) {
|
||||
colDataSetVal(pCol, numOfRows, p->buf, false);
|
||||
code = colDataSetVal(pCol, numOfRows, p->buf, false);
|
||||
if (code) {
|
||||
return code;
|
||||
}
|
||||
} else {
|
||||
colDataSetNULL(pCol, numOfRows);
|
||||
}
|
||||
|
@ -146,15 +204,25 @@ static int32_t saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* p
|
|||
} else if (HASTYPE(pReader->type, CACHESCAN_RETRIEVE_LAST_ROW)) {
|
||||
for (int32_t i = 0; i < pReader->numOfCols; ++i) {
|
||||
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotIds[i]);
|
||||
if (pColInfoData == NULL) {
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
|
||||
int32_t slotId = slotIds[i];
|
||||
if (slotId == -1) {
|
||||
colDataSetNULL(pColInfoData, numOfRows);
|
||||
continue;
|
||||
}
|
||||
SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, i);
|
||||
|
||||
saveOneRowForLastRaw(pColVal, pReader, slotId, pColInfoData, numOfRows);
|
||||
SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, i);
|
||||
if (pColVal == NULL) {
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
|
||||
code = saveOneRowForLastRaw(pColVal, pReader, slotId, pColInfoData, numOfRows);
|
||||
if (code) {
|
||||
return code;
|
||||
}
|
||||
}
|
||||
|
||||
// pBlock->info.rows += allNullRow ? 0 : 1;
|
||||
|
@ -164,7 +232,7 @@ static int32_t saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* p
|
|||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t setTableSchema(SCacheRowsReader* p, uint64_t suid, const char* idstr) {
|
||||
|
@ -206,7 +274,7 @@ int32_t tsdbReuseCacherowsReader(void* reader, void* pTableIdList, int32_t numOf
|
|||
destroySttBlockReader(pReader->pLDataIterArray, NULL);
|
||||
pReader->pLDataIterArray = taosArrayInit(4, POINTER_BYTES);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
return (pReader->pLDataIterArray != NULL) ? TSDB_CODE_SUCCESS : TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList, int32_t numOfTables, int32_t numOfCols,
|
||||
|
@ -269,18 +337,22 @@ int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList,
|
|||
}
|
||||
|
||||
p->idstr = taosStrdup(idstr);
|
||||
taosThreadMutexInit(&p->readerMutex, NULL);
|
||||
code = taosThreadMutexInit(&p->readerMutex, NULL);
|
||||
if (code) {
|
||||
tsdbCacherowsReaderClose(p);
|
||||
return code;
|
||||
}
|
||||
|
||||
p->lastTs = INT64_MIN;
|
||||
|
||||
*pReader = p;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
return code;
|
||||
}
|
||||
|
||||
void* tsdbCacherowsReaderClose(void* pReader) {
|
||||
void tsdbCacherowsReaderClose(void* pReader) {
|
||||
SCacheRowsReader* p = pReader;
|
||||
if (p == NULL) {
|
||||
return NULL;
|
||||
return;
|
||||
}
|
||||
|
||||
if (p->pSchema != NULL) {
|
||||
|
@ -307,12 +379,12 @@ void* tsdbCacherowsReaderClose(void* pReader) {
|
|||
}
|
||||
|
||||
if (p->pFileReader) {
|
||||
tsdbDataFileReaderClose(&p->pFileReader);
|
||||
(void) tsdbDataFileReaderClose(&p->pFileReader);
|
||||
p->pFileReader = NULL;
|
||||
}
|
||||
|
||||
taosMemoryFree((void*)p->idstr);
|
||||
taosThreadMutexDestroy(&p->readerMutex);
|
||||
(void) taosThreadMutexDestroy(&p->readerMutex);
|
||||
|
||||
if (p->pTableMap) {
|
||||
void* pe = NULL;
|
||||
|
@ -330,7 +402,6 @@ void* tsdbCacherowsReaderClose(void* pReader) {
|
|||
}
|
||||
|
||||
taosMemoryFree(pReader);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static void freeItemOfRow(void* pItem) {
|
||||
|
@ -363,8 +434,7 @@ static int32_t tsdbCacheQueryReseek(void* pQHandle) {
|
|||
// just wait for the big all tables' snapshot untaking for now
|
||||
|
||||
code = TSDB_CODE_VND_QUERY_BUSY;
|
||||
|
||||
taosThreadMutexUnlock(&pReader->readerMutex);
|
||||
(void)taosThreadMutexUnlock(&pReader->readerMutex);
|
||||
|
||||
return code;
|
||||
} else if (code == EBUSY) {
|
||||
|
@ -381,9 +451,14 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
|
|||
}
|
||||
|
||||
SCacheRowsReader* pr = pReader;
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SArray* pRow = taosArrayInit(TARRAY_SIZE(pr->pCidList), sizeof(SLastCol));
|
||||
bool hasRes = false;
|
||||
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
bool hasRes = false;
|
||||
SArray* pRow = taosArrayInit(TARRAY_SIZE(pr->pCidList), sizeof(SLastCol));
|
||||
if (pRow == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _end;
|
||||
}
|
||||
|
||||
void** pRes = taosMemoryCalloc(pr->numOfCols, POINTER_BYTES);
|
||||
if (pRes == NULL) {
|
||||
|
@ -391,16 +466,21 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
|
|||
goto _end;
|
||||
}
|
||||
|
||||
int32_t pkBufLen = (pr->rowKey.numOfPKs > 0)? pr->pkColumn.bytes:0;
|
||||
int32_t pkBufLen = (pr->rowKey.numOfPKs > 0) ? pr->pkColumn.bytes : 0;
|
||||
for (int32_t j = 0; j < pr->numOfCols; ++j) {
|
||||
int32_t bytes = (slotIds[j] == -1) ? 1 : pr->pSchema->columns[slotIds[j]].bytes;
|
||||
|
||||
pRes[j] = taosMemoryCalloc(1, sizeof(SFirstLastRes) + bytes + pkBufLen + VARSTR_HEADER_SIZE);
|
||||
if (pRes[j] == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _end;
|
||||
}
|
||||
|
||||
SFirstLastRes* p = (SFirstLastRes*)varDataVal(pRes[j]);
|
||||
p->ts = INT64_MIN;
|
||||
}
|
||||
|
||||
taosThreadMutexLock(&pr->readerMutex);
|
||||
(void)taosThreadMutexLock(&pr->readerMutex);
|
||||
code = tsdbTakeReadSnap2((STsdbReader*)pr, tsdbCacheQueryReseek, &pr->pReadSnap);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _end;
|
||||
|
@ -422,7 +502,11 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
|
|||
int32_t slotId = slotIds[i];
|
||||
if (slotId == -1) {
|
||||
SLastCol p = {.rowKey.ts = INT64_MIN, .colVal.value.type = TSDB_DATA_TYPE_BOOL, .colVal.flag = CV_FLAG_NULL};
|
||||
taosArrayPush(pLastCols, &p);
|
||||
void* px = taosArrayPush(pLastCols, &p);
|
||||
if (px == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _end;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
struct STColumn* pCol = &pr->pSchema->columns[slotId];
|
||||
|
@ -433,15 +517,29 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
|
|||
for (int32_t j = 0; j < pr->rowKey.numOfPKs; j++) {
|
||||
p.rowKey.pks[j].type = pr->pkColumn.type;
|
||||
if (IS_VAR_DATA_TYPE(pr->pkColumn.type)) {
|
||||
|
||||
p.rowKey.pks[j].pData = taosMemoryCalloc(1, pr->pkColumn.bytes);
|
||||
if (p.rowKey.pks[j].pData == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _end;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (IS_VAR_DATA_TYPE(pCol->type)) {
|
||||
p.colVal.value.pData = taosMemoryCalloc(pCol->bytes, sizeof(char));
|
||||
if (p.colVal.value.pData == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _end;
|
||||
}
|
||||
}
|
||||
|
||||
void* px = taosArrayPush(pLastCols, &p);
|
||||
if (px == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _end;
|
||||
}
|
||||
taosArrayPush(pLastCols, &p);
|
||||
}
|
||||
|
||||
int64_t st = taosGetTimestampUs();
|
||||
|
@ -449,7 +547,13 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
|
|||
for (int32_t i = 0; i < pr->numOfTables; ++i) {
|
||||
tb_uid_t uid = pTableList[i].uid;
|
||||
|
||||
tsdbCacheGetBatch(pr->pTsdb, uid, pRow, pr, ltype);
|
||||
code = tsdbCacheGetBatch(pr->pTsdb, uid, pRow, pr, ltype);
|
||||
if (code == -1) {// fix the invalid return code
|
||||
code = 0;
|
||||
} else if (code != 0) {
|
||||
goto _end;
|
||||
}
|
||||
|
||||
if (TARRAY_SIZE(pRow) <= 0 || COL_VAL_IS_NONE(&((SLastCol*)TARRAY_DATA(pRow))[0].colVal)) {
|
||||
taosArrayClearEx(pRow, freeItemOfRow);
|
||||
continue;
|
||||
|
@ -461,7 +565,14 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
|
|||
for (int32_t k = 0; k < pr->numOfCols; ++k) {
|
||||
if (slotIds[k] == -1) continue;
|
||||
SLastCol* p = taosArrayGet(pLastCols, k);
|
||||
if (p == NULL) {
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
|
||||
SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, k);
|
||||
if (pColVal == NULL) {
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
|
||||
if (tRowKeyCompare(&pColVal->rowKey, &p->rowKey) > 0) {
|
||||
if (!COL_VAL_IS_VALUE(&pColVal->colVal) && HASTYPE(pr->type, CACHESCAN_RETRIEVE_LAST)) {
|
||||
|
@ -487,7 +598,11 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
|
|||
|
||||
if (k == 0) {
|
||||
if (TARRAY_SIZE(pTableUidList) == 0) {
|
||||
taosArrayPush(pTableUidList, &uid);
|
||||
void* px = taosArrayPush(pTableUidList, &uid);
|
||||
if (px == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _end;
|
||||
}
|
||||
} else {
|
||||
taosArraySet(pTableUidList, 0, &uid);
|
||||
}
|
||||
|
@ -527,7 +642,10 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
|
|||
}
|
||||
|
||||
if (hasRes) {
|
||||
saveOneRow(pLastCols, pResBlock, pr, slotIds, dstSlotIds, pRes, pr->idstr);
|
||||
code = saveOneRow(pLastCols, pResBlock, pr, slotIds, dstSlotIds, pRes, pr->idstr);
|
||||
if (code) {
|
||||
goto _end;
|
||||
}
|
||||
}
|
||||
|
||||
taosArrayDestroyEx(pLastCols, freeItemWithPk);
|
||||
|
@ -535,16 +653,31 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
|
|||
for (int32_t i = pr->tableIndex; i < pr->numOfTables; ++i) {
|
||||
tb_uid_t uid = pTableList[i].uid;
|
||||
|
||||
tsdbCacheGetBatch(pr->pTsdb, uid, pRow, pr, ltype);
|
||||
if ((code = tsdbCacheGetBatch(pr->pTsdb, uid, pRow, pr, ltype)) != 0) {
|
||||
if (code == -1) {// fix the invalid return code
|
||||
code = 0;
|
||||
} else if (code != 0) {
|
||||
goto _end;
|
||||
}
|
||||
}
|
||||
|
||||
if (TARRAY_SIZE(pRow) <= 0 || COL_VAL_IS_NONE(&((SLastCol*)TARRAY_DATA(pRow))[0].colVal)) {
|
||||
taosArrayClearEx(pRow, freeItemOfRow);
|
||||
continue;
|
||||
}
|
||||
|
||||
saveOneRow(pRow, pResBlock, pr, slotIds, dstSlotIds, pRes, pr->idstr);
|
||||
code = saveOneRow(pRow, pResBlock, pr, slotIds, dstSlotIds, pRes, pr->idstr);
|
||||
if (code) {
|
||||
goto _end;
|
||||
}
|
||||
|
||||
taosArrayClearEx(pRow, freeItemOfRow);
|
||||
|
||||
taosArrayPush(pTableUidList, &uid);
|
||||
void* px = taosArrayPush(pTableUidList, &uid);
|
||||
if (px == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _end;
|
||||
}
|
||||
|
||||
++pr->tableIndex;
|
||||
if (pResBlock->info.rows >= pResBlock->info.capacity) {
|
||||
|
@ -561,7 +694,7 @@ _end:
|
|||
pr->pCurFileSet = NULL;
|
||||
}
|
||||
|
||||
taosThreadMutexUnlock(&pr->readerMutex);
|
||||
(void)taosThreadMutexUnlock(&pr->readerMutex);
|
||||
|
||||
if (pRes != NULL) {
|
||||
for (int32_t j = 0; j < pr->numOfCols; ++j) {
|
||||
|
|
|
@ -48,7 +48,7 @@ typedef struct {
|
|||
static int32_t getCurrentBlockInfo(SDataBlockIter* pBlockIter, SFileDataBlockInfo** pInfo);
|
||||
static int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity,
|
||||
STsdbReader* pReader);
|
||||
static int32_t getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader, TSDBROW** pRow);
|
||||
static int32_t getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader, TSDBROW** pRes);
|
||||
static int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, SRowKey* pKey,
|
||||
STsdbReader* pReader);
|
||||
static int32_t doMergeRowsInSttBlock(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pScanInfo,
|
||||
|
@ -73,8 +73,8 @@ static int32_t mergeRowsInSttBlocks(SSttBlockReader* pSttBlockReader, STableBloc
|
|||
STsdbReader* pReader);
|
||||
|
||||
static int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, int32_t order, SReadCostSummary* pCost);
|
||||
static STsdb* getTsdbByRetentions(SVnode* pVnode, SQueryTableDataCond* pCond, SRetention* retentions, const char* idstr,
|
||||
int8_t* pLevel);
|
||||
static void getTsdbByRetentions(SVnode* pVnode, SQueryTableDataCond* pCond, SRetention* retentions, const char* idstr,
|
||||
int8_t* pLevel, STsdb** pTsdb);
|
||||
static SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_t level);
|
||||
static int32_t doBuildDataBlock(STsdbReader* pReader);
|
||||
static TSDBKEY getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbReader* pReader);
|
||||
|
@ -123,7 +123,7 @@ static void tColRowGetPriamyKeyDeepCopy(SBlockData* pBlock, int32_t irow, int32_
|
|||
pKey->pks[0].val = cv.value.val;
|
||||
} else {
|
||||
pKey->pks[0].nData = cv.value.nData;
|
||||
(void)(void)memcpy(pKey->pks[0].pData, cv.value.pData, cv.value.nData);
|
||||
(void)memcpy(pKey->pks[0].pData, cv.value.pData, cv.value.nData);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -439,7 +439,6 @@ static int32_t tsdbUninitReaderLock(STsdbReader* pReader) {
|
|||
tsdbTrace("tsdb/read: %p, pre-uninit read mutex: %p, code: %d", pReader, &pReader->readerMutex, code);
|
||||
|
||||
code = taosThreadMutexDestroy(&pReader->readerMutex);
|
||||
|
||||
tsdbTrace("tsdb/read: %p, post-uninit read mutex: %p, code: %d", pReader, &pReader->readerMutex, code);
|
||||
|
||||
return code;
|
||||
|
@ -450,7 +449,6 @@ static int32_t tsdbAcquireReader(STsdbReader* pReader) {
|
|||
tsdbTrace("tsdb/read: %p, pre-take read mutex: %p, code: %d", pReader, &pReader->readerMutex, code);
|
||||
|
||||
code = taosThreadMutexLock(&pReader->readerMutex);
|
||||
|
||||
tsdbTrace("tsdb/read: %p, post-take read mutex: %p, code: %d", pReader, &pReader->readerMutex, code);
|
||||
|
||||
return code;
|
||||
|
@ -541,8 +539,7 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, void
|
|||
}
|
||||
|
||||
initReaderStatus(&pReader->status);
|
||||
|
||||
pReader->pTsdb = getTsdbByRetentions(pVnode, pCond, pVnode->config.tsdbCfg.retentions, idstr, &level);
|
||||
getTsdbByRetentions(pVnode, pCond, pVnode->config.tsdbCfg.retentions, idstr, &level, &pReader->pTsdb);
|
||||
|
||||
pReader->info.suid = pCond->suid;
|
||||
pReader->info.order = pCond->order;
|
||||
|
@ -550,6 +547,11 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, void
|
|||
pReader->info.window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows);
|
||||
|
||||
pReader->idStr = (idstr != NULL) ? taosStrdup(idstr) : NULL;
|
||||
if (idstr != NULL && pReader->idStr == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _end;
|
||||
}
|
||||
|
||||
pReader->type = pCond->type;
|
||||
pReader->bFilesetDelimited = false;
|
||||
pReader->blockInfoBuf.numPerBucket = 1000; // 1000 tables per bucket
|
||||
|
@ -586,6 +588,11 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, void
|
|||
}
|
||||
|
||||
pReader->status.pPrimaryTsCol = taosArrayGet(pReader->resBlockInfo.pResBlock->pDataBlock, pSup->slotId[0]);
|
||||
if (pReader->status.pPrimaryTsCol == NULL) {
|
||||
code = TSDB_CODE_INVALID_PARA;
|
||||
goto _end;
|
||||
}
|
||||
|
||||
int32_t type = pReader->status.pPrimaryTsCol->info.type;
|
||||
if (type != TSDB_DATA_TYPE_TIMESTAMP) {
|
||||
tsdbError("the first column isn't primary timestamp in result block, actual: %s, %s", tDataTypes[type].name,
|
||||
|
@ -651,7 +658,6 @@ static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFileReader* pFileRead
|
|||
STableUidList* pList = &pReader->status.uidList;
|
||||
|
||||
int32_t i = 0;
|
||||
|
||||
while (i < TARRAY2_SIZE(pBlkArray)) {
|
||||
pBrinBlk = &pBlkArray->data[i];
|
||||
if (pBrinBlk->maxTbid.suid < pReader->info.suid) {
|
||||
|
@ -824,6 +830,10 @@ static int32_t loadFileBlockBrinInfo(STsdbReader* pReader, SArray* pIndexList, S
|
|||
p1 = taosArrayPush(pTableScanInfoList, &pScanInfo);
|
||||
} else {
|
||||
STableBlockScanInfo** p = taosArrayGetLast(pTableScanInfoList);
|
||||
if (p == NULL) {
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
|
||||
if ((*p)->uid != uid) {
|
||||
p1 = taosArrayPush(pTableScanInfoList, &pScanInfo);
|
||||
}
|
||||
|
@ -1273,6 +1283,10 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, SRowKey* pLastPro
|
|||
int32_t rowIndex = 0;
|
||||
|
||||
SColumnInfoData* pColData = taosArrayGet(pResBlock->pDataBlock, pSupInfo->slotId[i]);
|
||||
if (pColData == NULL) {
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
|
||||
if (pSupInfo->colId[i] == PRIMARYKEY_TIMESTAMP_COL_ID) {
|
||||
copyPrimaryTsCol(pBlockData, pDumpInfo, pColData, dumpedRows, asc);
|
||||
i += 1;
|
||||
|
@ -1288,6 +1302,9 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, SRowKey* pLastPro
|
|||
colIndex += 1;
|
||||
} else if (pData->cid == pSupInfo->colId[i]) {
|
||||
pColData = taosArrayGet(pResBlock->pDataBlock, pSupInfo->slotId[i]);
|
||||
if (pColData == NULL) {
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
|
||||
if (pData->flag == HAS_NONE || pData->flag == HAS_NULL || pData->flag == (HAS_NULL | HAS_NONE)) {
|
||||
colDataSetNNULL(pColData, 0, dumpedRows);
|
||||
|
@ -1309,6 +1326,10 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, SRowKey* pLastPro
|
|||
i += 1;
|
||||
} else { // the specified column does not exist in file block, fill with null data
|
||||
pColData = taosArrayGet(pResBlock->pDataBlock, pSupInfo->slotId[i]);
|
||||
if (pColData == NULL) {
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
|
||||
colDataSetNNULL(pColData, 0, dumpedRows);
|
||||
i += 1;
|
||||
}
|
||||
|
@ -1317,6 +1338,10 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, SRowKey* pLastPro
|
|||
// fill the mis-matched columns with null value
|
||||
while (i < numOfOutputCols) {
|
||||
pColData = taosArrayGet(pResBlock->pDataBlock, pSupInfo->slotId[i]);
|
||||
if (pColData == NULL) {
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
|
||||
colDataSetNNULL(pColData, 0, dumpedRows);
|
||||
i += 1;
|
||||
}
|
||||
|
@ -1448,7 +1473,15 @@ static bool getNeighborBlockOfTable(SDataBlockIter* pBlockIter, SFileDataBlockIn
|
|||
}
|
||||
|
||||
STableDataBlockIdx* pTableDataBlockIdx = taosArrayGet(pScanInfo->pBlockIdxList, pBlockInfo->tbBlockIdx + step);
|
||||
if (pTableDataBlockIdx == NULL) {
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
|
||||
SFileDataBlockInfo* p = taosArrayGet(pBlockIter->blockList, pTableDataBlockIdx->globalIndex);
|
||||
if (p == NULL) {
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
|
||||
blockInfoToRecord(pRecord, p, pSupInfo);
|
||||
|
||||
*nextIndex = pBlockInfo->tbBlockIdx + step;
|
||||
|
@ -1462,13 +1495,21 @@ static int32_t setFileBlockActiveInBlockIter(STsdbReader* pReader, SDataBlockIte
|
|||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
SFileDataBlockInfo fblock = *(SFileDataBlockInfo*)taosArrayGet(pBlockIter->blockList, index);
|
||||
void* p = taosArrayGet(pBlockIter->blockList, index);
|
||||
if (p == NULL) {
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
|
||||
SFileDataBlockInfo fblock = *(SFileDataBlockInfo*) p;
|
||||
pBlockIter->index += step;
|
||||
|
||||
if (index != pBlockIter->index) {
|
||||
if (index > pBlockIter->index) {
|
||||
for (int32_t i = index - 1; i >= pBlockIter->index; --i) {
|
||||
SFileDataBlockInfo* pBlockInfo = taosArrayGet(pBlockIter->blockList, i);
|
||||
if (pBlockInfo == NULL) {
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
|
||||
STableBlockScanInfo* pBlockScanInfo = NULL;
|
||||
code = getTableBlockScanInfo(pReader->status.pTableMap, pBlockInfo->uid, &pBlockScanInfo, pReader->idStr);
|
||||
|
@ -1477,6 +1518,10 @@ static int32_t setFileBlockActiveInBlockIter(STsdbReader* pReader, SDataBlockIte
|
|||
}
|
||||
|
||||
STableDataBlockIdx* pTableDataBlockIdx = taosArrayGet(pBlockScanInfo->pBlockIdxList, pBlockInfo->tbBlockIdx);
|
||||
if (pTableDataBlockIdx == NULL) {
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
|
||||
pTableDataBlockIdx->globalIndex = i + 1;
|
||||
|
||||
taosArraySet(pBlockIter->blockList, i + 1, pBlockInfo);
|
||||
|
@ -1484,6 +1529,9 @@ static int32_t setFileBlockActiveInBlockIter(STsdbReader* pReader, SDataBlockIte
|
|||
} else if (index < pBlockIter->index) {
|
||||
for (int32_t i = index + 1; i <= pBlockIter->index; ++i) {
|
||||
SFileDataBlockInfo* pBlockInfo = taosArrayGet(pBlockIter->blockList, i);
|
||||
if (pBlockInfo == NULL) {
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
|
||||
STableBlockScanInfo* pBlockScanInfo = NULL;
|
||||
code = getTableBlockScanInfo(pReader->status.pTableMap, pBlockInfo->uid, &pBlockScanInfo, pReader->idStr);
|
||||
|
@ -1492,8 +1540,11 @@ static int32_t setFileBlockActiveInBlockIter(STsdbReader* pReader, SDataBlockIte
|
|||
}
|
||||
|
||||
STableDataBlockIdx* pTableDataBlockIdx = taosArrayGet(pBlockScanInfo->pBlockIdxList, pBlockInfo->tbBlockIdx);
|
||||
pTableDataBlockIdx->globalIndex = i - 1;
|
||||
if (pTableDataBlockIdx == NULL) {
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
|
||||
pTableDataBlockIdx->globalIndex = i - 1;
|
||||
taosArraySet(pBlockIter->blockList, i - 1, pBlockInfo);
|
||||
}
|
||||
}
|
||||
|
@ -1505,7 +1556,11 @@ static int32_t setFileBlockActiveInBlockIter(STsdbReader* pReader, SDataBlockIte
|
|||
return code;
|
||||
}
|
||||
|
||||
STableDataBlockIdx* pTableDataBlockIdx = taosArrayGet(pBlockScanInfo->pBlockIdxList, fblock.tbBlockIdx);
|
||||
STableDataBlockIdx* pTableDataBlockIdx = taosArrayGet(pBlockScanInfo->pBlockIdxList, fblock.tbBlockIdx);
|
||||
if (pTableDataBlockIdx == NULL) {
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
|
||||
pTableDataBlockIdx->globalIndex = pBlockIter->index;
|
||||
}
|
||||
|
||||
|
@ -2338,7 +2393,7 @@ static bool initSttBlockReader(SSttBlockReader* pSttBlockReader, STableBlockScan
|
|||
pSttBlockReader->uid = pScanInfo->uid;
|
||||
|
||||
// second time init stt block reader
|
||||
if (pScanInfo->cleanSttBlocks && pReader->info.execMode == READER_EXEC_ROWS) {
|
||||
if (pScanInfo->cleanSttBlocks && (pReader->info.execMode == READER_EXEC_ROWS)) {
|
||||
return !pScanInfo->sttBlockReturned;
|
||||
}
|
||||
|
||||
|
@ -2374,6 +2429,9 @@ static bool initSttBlockReader(SSttBlockReader* pSttBlockReader, STableBlockScan
|
|||
};
|
||||
|
||||
SSttDataInfoForTable info = {.pKeyRangeList = taosArrayInit(4, sizeof(SSttKeyRange))};
|
||||
if (info.pKeyRangeList == NULL) {
|
||||
return false;
|
||||
}
|
||||
|
||||
int32_t code = tMergeTreeOpen2(&pSttBlockReader->mergeTree, &conf, &info);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -2398,6 +2456,10 @@ static bool initSttBlockReader(SSttBlockReader* pSttBlockReader, STableBlockScan
|
|||
// calculate the time window for data in stt files
|
||||
for (int32_t i = 0; i < taosArrayGetSize(info.pKeyRangeList); ++i) {
|
||||
SSttKeyRange* pKeyRange = taosArrayGet(info.pKeyRangeList, i);
|
||||
if (pKeyRange == NULL) {
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
|
||||
if (pkCompEx(&pScanInfo->sttRange.skey, &pKeyRange->skey) > 0) {
|
||||
tRowKeyAssign(&pScanInfo->sttRange.skey, &pKeyRange->skey);
|
||||
}
|
||||
|
@ -2618,6 +2680,9 @@ static int32_t loadNeighborIfOverlap(SFileDataBlockInfo* pBlockInfo, STableBlock
|
|||
|
||||
// 1. find the next neighbor block in the scan block list
|
||||
STableDataBlockIdx* tableDataBlockIdx = taosArrayGet(pBlockScanInfo->pBlockIdxList, nextIndex);
|
||||
if (tableDataBlockIdx == NULL) {
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
|
||||
// 2. remove it from the scan block list
|
||||
int32_t neighborIndex = tableDataBlockIdx->globalIndex;
|
||||
|
@ -2787,6 +2852,9 @@ int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, int32_t orde
|
|||
taosArrayClear(pBlockScanInfo->delSkyline);
|
||||
} else {
|
||||
pBlockScanInfo->delSkyline = taosArrayInit(4, sizeof(TSDBKEY));
|
||||
if (pBlockScanInfo->delSkyline == NULL) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
}
|
||||
|
||||
SArray* pSource = pBlockScanInfo->pFileDelData;
|
||||
|
@ -3412,6 +3480,9 @@ static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter)
|
|||
static int32_t initForFirstBlockInFile(STsdbReader* pReader, SDataBlockIter* pBlockIter) {
|
||||
SBlockNumber num = {0};
|
||||
SArray* pTableList = taosArrayInit(40, POINTER_BYTES);
|
||||
if (pTableList == NULL) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
int32_t code = moveToNextFile(pReader, &num, pTableList);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -3563,8 +3634,13 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) {
|
|||
}
|
||||
}
|
||||
|
||||
static STsdb* getTsdbByRetentions(SVnode* pVnode, SQueryTableDataCond* pCond, SRetention* retentions, const char* idStr,
|
||||
int8_t* pLevel) {
|
||||
static void getTsdbByRetentions(SVnode* pVnode, SQueryTableDataCond* pCond, SRetention* retentions, const char* idStr,
|
||||
int8_t* pLevel, STsdb** pTsdb) {
|
||||
if (pTsdb == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
*pTsdb = NULL;
|
||||
if (VND_IS_RSMA(pVnode) && !pCond->skipRollup) {
|
||||
int8_t level = 0;
|
||||
int8_t precision = pVnode->config.tsdbCfg.precision;
|
||||
|
@ -3573,7 +3649,7 @@ static STsdb* getTsdbByRetentions(SVnode* pVnode, SQueryTableDataCond* pCond, SR
|
|||
: (precision == TSDB_TIME_PRECISION_MICRO) ? 1000L
|
||||
: 1000000L);
|
||||
|
||||
for (int8_t i = 0; i < TSDB_RETENTION_MAX; ++i) {
|
||||
for (int32_t i = 0; i < TSDB_RETENTION_MAX; ++i) {
|
||||
SRetention* pRetention = retentions + level;
|
||||
if (pRetention->keep <= 0) {
|
||||
if (level > 0) {
|
||||
|
@ -3592,19 +3668,22 @@ static STsdb* getTsdbByRetentions(SVnode* pVnode, SQueryTableDataCond* pCond, SR
|
|||
if (level == TSDB_RETENTION_L0) {
|
||||
*pLevel = TSDB_RETENTION_L0;
|
||||
tsdbDebug("vgId:%d, rsma level %d is selected to query %s", TD_VID(pVnode), TSDB_RETENTION_L0, str);
|
||||
return VND_RSMA0(pVnode);
|
||||
*pTsdb = VND_RSMA0(pVnode);
|
||||
return;
|
||||
} else if (level == TSDB_RETENTION_L1) {
|
||||
*pLevel = TSDB_RETENTION_L1;
|
||||
tsdbDebug("vgId:%d, rsma level %d is selected to query %s", TD_VID(pVnode), TSDB_RETENTION_L1, str);
|
||||
return VND_RSMA1(pVnode);
|
||||
*pTsdb = VND_RSMA1(pVnode);
|
||||
return;
|
||||
} else {
|
||||
*pLevel = TSDB_RETENTION_L2;
|
||||
tsdbDebug("vgId:%d, rsma level %d is selected to query %s", TD_VID(pVnode), TSDB_RETENTION_L2, str);
|
||||
return VND_RSMA2(pVnode);
|
||||
*pTsdb = VND_RSMA2(pVnode);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
return VND_TSDB(pVnode);
|
||||
*pTsdb = VND_TSDB(pVnode);
|
||||
}
|
||||
|
||||
SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_t level) {
|
||||
|
@ -3631,6 +3710,10 @@ static int32_t reverseSearchStartPos(const SArray* pDelList, int32_t index, int6
|
|||
}
|
||||
|
||||
TSDBKEY* p = taosArrayGet(pDelList, start);
|
||||
if (p == NULL) {
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
|
||||
while (p->ts >= key && start > 0) {
|
||||
start -= 1;
|
||||
}
|
||||
|
@ -3640,6 +3723,10 @@ static int32_t reverseSearchStartPos(const SArray* pDelList, int32_t index, int6
|
|||
}
|
||||
|
||||
TSDBKEY* p = taosArrayGet(pDelList, start);
|
||||
if (p == NULL) {
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
|
||||
while (p->ts <= key && start < num - 1) {
|
||||
start += 1;
|
||||
}
|
||||
|
@ -3665,17 +3752,27 @@ bool hasBeenDropped(const SArray* pDelList, int32_t* index, int64_t key, int64_t
|
|||
if (asc) {
|
||||
if (*index >= num - 1) {
|
||||
TSDBKEY* last = taosArrayGetLast(pDelList);
|
||||
ASSERT(key >= last->ts);
|
||||
if (last == NULL) {
|
||||
return false;
|
||||
}
|
||||
|
||||
ASSERT(key >= last->ts);
|
||||
if (key > last->ts) {
|
||||
return false;
|
||||
} else if (key == last->ts) {
|
||||
TSDBKEY* prev = taosArrayGet(pDelList, num - 2);
|
||||
if (prev == NULL) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return (prev->version >= ver && prev->version <= pVerRange->maxVer && prev->version >= pVerRange->minVer);
|
||||
}
|
||||
} else {
|
||||
TSDBKEY* pCurrent = taosArrayGet(pDelList, *index);
|
||||
TSDBKEY* pNext = taosArrayGet(pDelList, (*index) + 1);
|
||||
if (pCurrent == NULL || pNext == NULL) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (key < pCurrent->ts) {
|
||||
return false;
|
||||
|
@ -3692,6 +3789,9 @@ bool hasBeenDropped(const SArray* pDelList, int32_t* index, int64_t key, int64_t
|
|||
if ((*index) < num - 1) {
|
||||
pCurrent = taosArrayGet(pDelList, *index);
|
||||
pNext = taosArrayGet(pDelList, (*index) + 1);
|
||||
if (pCurrent == NULL || pNext == NULL) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// it is not a consecutive deletion range, ignore it
|
||||
if (pCurrent->version == 0 && pNext->version > 0) {
|
||||
|
@ -3710,6 +3810,9 @@ bool hasBeenDropped(const SArray* pDelList, int32_t* index, int64_t key, int64_t
|
|||
} else {
|
||||
if (*index <= 0) {
|
||||
TSDBKEY* pFirst = taosArrayGet(pDelList, 0);
|
||||
if (pFirst == NULL) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (key < pFirst->ts) {
|
||||
return false;
|
||||
|
@ -3721,6 +3824,9 @@ bool hasBeenDropped(const SArray* pDelList, int32_t* index, int64_t key, int64_t
|
|||
} else {
|
||||
TSDBKEY* pCurrent = taosArrayGet(pDelList, *index);
|
||||
TSDBKEY* pPrev = taosArrayGet(pDelList, (*index) - 1);
|
||||
if (pCurrent == NULL || pPrev == NULL) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (key > pCurrent->ts) {
|
||||
return false;
|
||||
|
@ -3736,6 +3842,9 @@ bool hasBeenDropped(const SArray* pDelList, int32_t* index, int64_t key, int64_t
|
|||
if ((*index) >= 1) {
|
||||
pCurrent = taosArrayGet(pDelList, *index);
|
||||
pPrev = taosArrayGet(pDelList, (*index) - 1);
|
||||
if (pCurrent == NULL || pPrev == NULL) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// it is not a consecutive deletion range, ignore it
|
||||
if (pCurrent->version > 0 && pPrev->version == 0) {
|
||||
|
@ -4229,6 +4338,10 @@ int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, SRow* pT
|
|||
|
||||
if (pSupInfo->colId[i] == PRIMARYKEY_TIMESTAMP_COL_ID) {
|
||||
SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, pSupInfo->slotId[i]);
|
||||
if (pColData == NULL) {
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
|
||||
((int64_t*)pColData->pData)[outputRowIndex] = pTSRow->ts;
|
||||
i += 1;
|
||||
}
|
||||
|
@ -4238,6 +4351,9 @@ int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, SRow* pT
|
|||
|
||||
if (colId == pSchema->columns[j].colId) {
|
||||
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pSupInfo->slotId[i]);
|
||||
if (pColInfoData == NULL) {
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
|
||||
code = tRowGet(pTSRow, pSchema, j, &colVal);
|
||||
if (code) {
|
||||
|
@ -4252,6 +4368,9 @@ int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, SRow* pT
|
|||
j += 1;
|
||||
} else if (colId < pSchema->columns[j].colId) {
|
||||
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pSupInfo->slotId[i]);
|
||||
if (pColInfoData == NULL) {
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
|
||||
colDataSetNULL(pColInfoData, outputRowIndex);
|
||||
i += 1;
|
||||
|
@ -4263,6 +4382,10 @@ int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, SRow* pT
|
|||
// set null value since current column does not exist in the "pSchema"
|
||||
while (i < pSupInfo->numOfCols) {
|
||||
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pSupInfo->slotId[i]);
|
||||
if (pColInfoData == NULL) {
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
|
||||
colDataSetNULL(pColInfoData, outputRowIndex);
|
||||
i += 1;
|
||||
}
|
||||
|
@ -4311,6 +4434,10 @@ int32_t doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, S
|
|||
|
||||
while (i < numOfOutputCols) {
|
||||
SColumnInfoData* pCol = taosArrayGet(pResBlock->pDataBlock, pSupInfo->slotId[i]);
|
||||
if (pCol == NULL) {
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
|
||||
colDataSetNULL(pCol, outputRowIndex);
|
||||
i += 1;
|
||||
}
|
||||
|
@ -4494,19 +4621,16 @@ int32_t tsdbReaderOpen2(void* pVnode, SQueryTableDataCond* pCond, void* pTableLi
|
|||
STimeWindow window = pCond->twindows;
|
||||
SVnodeCfg* pConf = &(((SVnode*)pVnode)->config);
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
int32_t capacity = pConf->tsdbCfg.maxRows;
|
||||
if (pResBlock != NULL) {
|
||||
code = blockDataEnsureCapacity(pResBlock, capacity);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _err;
|
||||
}
|
||||
TSDB_CHECK_CODE(code, lino, _err);
|
||||
}
|
||||
|
||||
code = tsdbReaderCreate(pVnode, pCond, ppReader, capacity, pResBlock, idstr);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _err;
|
||||
}
|
||||
TSDB_CHECK_CODE(code, lino, _err);
|
||||
|
||||
// check for query time window
|
||||
STsdbReader* pReader = *ppReader;
|
||||
|
@ -4530,9 +4654,7 @@ int32_t tsdbReaderOpen2(void* pVnode, SQueryTableDataCond* pCond, void* pTableLi
|
|||
|
||||
// here we only need one more row, so the capacity is set to be ONE.
|
||||
code = tsdbReaderCreate(pVnode, pCond, (void**)&((STsdbReader*)pReader)->innerReader[0], 1, pResBlock, idstr);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _err;
|
||||
}
|
||||
TSDB_CHECK_CODE(code, lino, _err);
|
||||
|
||||
if (order == TSDB_ORDER_ASC) {
|
||||
pCond->twindows.skey = window.ekey + 1;
|
||||
|
@ -4544,9 +4666,7 @@ int32_t tsdbReaderOpen2(void* pVnode, SQueryTableDataCond* pCond, void* pTableLi
|
|||
pCond->order = order;
|
||||
|
||||
code = tsdbReaderCreate(pVnode, pCond, (void**)&((STsdbReader*)pReader)->innerReader[1], 1, pResBlock, idstr);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _err;
|
||||
}
|
||||
TSDB_CHECK_CODE(code, lino, _err);
|
||||
}
|
||||
|
||||
// NOTE: the endVersion in pCond is the data version not schema version, so pCond->endVersion is not correct here.
|
||||
|
@ -4567,9 +4687,7 @@ int32_t tsdbReaderOpen2(void* pVnode, SQueryTableDataCond* pCond, void* pTableLi
|
|||
|
||||
if (pReader->info.pSchema != NULL) {
|
||||
code = tsdbRowMergerInit(&pReader->status.merger, pReader->info.pSchema);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
TSDB_CHECK_CODE(code, lino, _err);
|
||||
}
|
||||
|
||||
pReader->pSchemaMap = tSimpleHashInit(8, taosFastHash);
|
||||
|
@ -4582,18 +4700,13 @@ int32_t tsdbReaderOpen2(void* pVnode, SQueryTableDataCond* pCond, void* pTableLi
|
|||
tSimpleHashSetFreeFp(pReader->pSchemaMap, freeSchemaFunc);
|
||||
if (pReader->info.pSchema != NULL) {
|
||||
code = updateBlockSMAInfo(pReader->info.pSchema, &pReader->suppInfo);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _err;
|
||||
}
|
||||
TSDB_CHECK_CODE(code, lino, _err);
|
||||
}
|
||||
|
||||
STsdbReader* p = (pReader->innerReader[0] != NULL) ? pReader->innerReader[0] : pReader;
|
||||
|
||||
code = createDataBlockScanInfo(p, &pReader->blockInfoBuf, pTableList, &pReader->status.uidList, numOfTables, &pReader->status.pTableMap);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
*ppReader = NULL;
|
||||
goto _err;
|
||||
}
|
||||
TSDB_CHECK_CODE(code, lino, _err);
|
||||
|
||||
pReader->status.pLDataIterArray = taosArrayInit(4, POINTER_BYTES);
|
||||
if (pReader->status.pLDataIterArray == NULL) {
|
||||
|
|
|
@ -122,7 +122,9 @@ void clearBlockScanInfoBuf(SBlockInfoBuf* pBuf) {
|
|||
size_t num = taosArrayGetSize(pBuf->pData);
|
||||
for (int32_t i = 0; i < num; ++i) {
|
||||
char** p = taosArrayGet(pBuf->pData, i);
|
||||
taosMemoryFree(*p);
|
||||
if (p != NULL) {
|
||||
taosMemoryFree(*p);
|
||||
}
|
||||
}
|
||||
|
||||
taosArrayDestroy(pBuf->pData);
|
||||
|
@ -446,8 +448,8 @@ void cleanupInfoForNextFileset(SSHashObj* pTableMap) {
|
|||
|
||||
// brin records iterator
|
||||
void initBrinRecordIter(SBrinRecordIter* pIter, SDataFileReader* pReader, SArray* pList) {
|
||||
memset(&pIter->block, 0, sizeof(SBrinBlock));
|
||||
memset(&pIter->record, 0, sizeof(SBrinRecord));
|
||||
(void) memset(&pIter->block, 0, sizeof(SBrinBlock));
|
||||
(void) memset(&pIter->record, 0, sizeof(SBrinRecord));
|
||||
pIter->blockIndex = -1;
|
||||
pIter->recordIndex = -1;
|
||||
|
||||
|
@ -465,6 +467,9 @@ int32_t getNextBrinRecord(SBrinRecordIter* pIter, SBrinRecord** pRecord) {
|
|||
}
|
||||
|
||||
pIter->pCurrentBlk = taosArrayGet(pIter->pBrinBlockList, pIter->blockIndex);
|
||||
if (pIter->pCurrentBlk == NULL) {
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
|
||||
(void) tBrinBlockClear(&pIter->block);
|
||||
int32_t code = tsdbDataFileReadBrinBlock(pIter->pReader, pIter->pCurrentBlk, &pIter->block);
|
||||
|
@ -631,6 +636,10 @@ int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int3
|
|||
|
||||
for (int32_t k = 0; k < num; ++k) {
|
||||
SFileDataBlockInfo* pBlockInfo = taosArrayGet(pTableScanInfo->pBlockList, k);
|
||||
if (pBlockInfo == NULL) {
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
|
||||
sup.pDataBlockInfo[sup.numOfTables][k] =
|
||||
(SBlockOrderWrapper){.uid = pTableScanInfo->uid, .offset = pBlockInfo->blockOffset, .pInfo = pTableScanInfo};
|
||||
cnt++;
|
||||
|
@ -689,6 +698,10 @@ int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int3
|
|||
int32_t index = sup.indexPerTable[pos]++;
|
||||
|
||||
SFileDataBlockInfo* pBlockInfo = taosArrayGet(sup.pDataBlockInfo[pos][index].pInfo->pBlockList, index);
|
||||
if (pBlockInfo == NULL) {
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
|
||||
void* px = taosArrayPush(pBlockIter->blockList, pBlockInfo);
|
||||
if (px == NULL) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
@ -1208,6 +1221,10 @@ bool isCleanSttBlock(SArray* pKeyRangeList, STimeWindow* pQueryWindow, STableBlo
|
|||
}
|
||||
|
||||
SSttKeyRange* pRange = taosArrayGet(pKeyRangeList, 0);
|
||||
if (pRange == NULL) {
|
||||
return false;
|
||||
}
|
||||
|
||||
STimeWindow w = {.skey = pRange->skey.ts, .ekey = pRange->ekey.ts};
|
||||
if (overlapWithTimeWindow(&w, pQueryWindow, pScanInfo, order)) {
|
||||
return false;
|
||||
|
@ -1216,6 +1233,9 @@ bool isCleanSttBlock(SArray* pKeyRangeList, STimeWindow* pQueryWindow, STableBlo
|
|||
for (int32_t i = 0; i < num - 1; ++i) {
|
||||
SSttKeyRange* p1 = taosArrayGet(pKeyRangeList, i);
|
||||
SSttKeyRange* p2 = taosArrayGet(pKeyRangeList, i + 1);
|
||||
if (p1 == NULL || p2 == NULL) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (p1->ekey.ts >= p2->skey.ts) {
|
||||
return false;
|
||||
|
@ -1237,6 +1257,10 @@ static bool doCheckDatablockOverlap(STableBlockScanInfo* pBlockScanInfo, const S
|
|||
|
||||
for (int32_t i = startIndex; i < num; i += 1) {
|
||||
TSDBKEY* p = taosArrayGet(pBlockScanInfo->delSkyline, i);
|
||||
if (p == NULL) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (p->ts >= pRecord->firstKey.key.ts && p->ts <= pRecord->lastKey.key.ts) {
|
||||
if (p->version >= pRecord->minVer) {
|
||||
return true;
|
||||
|
@ -1245,6 +1269,10 @@ static bool doCheckDatablockOverlap(STableBlockScanInfo* pBlockScanInfo, const S
|
|||
if (p->version >= pRecord->minVer) {
|
||||
if (i < num - 1) {
|
||||
TSDBKEY* pnext = taosArrayGet(pBlockScanInfo->delSkyline, i + 1);
|
||||
if (pnext == NULL) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (pnext->ts >= pRecord->firstKey.key.ts) {
|
||||
return true;
|
||||
}
|
||||
|
@ -1266,11 +1294,19 @@ static bool doCheckDatablockOverlapWithoutVersion(STableBlockScanInfo* pBlockSca
|
|||
|
||||
for (int32_t i = startIndex; i < num; i += 1) {
|
||||
TSDBKEY* p = taosArrayGet(pBlockScanInfo->delSkyline, i);
|
||||
if (p == NULL) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (p->ts >= pRecord->firstKey.key.ts && p->ts <= pRecord->lastKey.key.ts) {
|
||||
return true;
|
||||
} else if (p->ts < pRecord->firstKey.key.ts) { // p->ts < pBlock->minKey.ts
|
||||
if (i < num - 1) {
|
||||
TSDBKEY* pnext = taosArrayGet(pBlockScanInfo->delSkyline, i + 1);
|
||||
if (pnext == NULL) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (pnext->ts >= pRecord->firstKey.key.ts) {
|
||||
return true;
|
||||
}
|
||||
|
@ -1291,6 +1327,10 @@ bool overlapWithDelSkyline(STableBlockScanInfo* pBlockScanInfo, const SBrinRecor
|
|||
// ts is not overlap
|
||||
TSDBKEY* pFirst = taosArrayGet(pBlockScanInfo->delSkyline, 0);
|
||||
TSDBKEY* pLast = taosArrayGetLast(pBlockScanInfo->delSkyline);
|
||||
if (pFirst == NULL || pLast == NULL) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (pRecord->firstKey.key.ts > pLast->ts || pRecord->lastKey.key.ts < pFirst->ts) {
|
||||
return false;
|
||||
}
|
||||
|
@ -1302,6 +1342,10 @@ bool overlapWithDelSkyline(STableBlockScanInfo* pBlockScanInfo, const SBrinRecor
|
|||
int32_t index = pBlockScanInfo->fileDelIndex;
|
||||
while (1) {
|
||||
TSDBKEY* p = taosArrayGet(pBlockScanInfo->delSkyline, index);
|
||||
if (p == NULL) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (p->ts > pRecord->firstKey.key.ts && index > 0) {
|
||||
index -= 1;
|
||||
} else { // find the first point that is smaller than the minKey.ts of dataBlock.
|
||||
|
@ -1324,6 +1368,10 @@ bool overlapWithDelSkylineWithoutVer(STableBlockScanInfo* pBlockScanInfo, const
|
|||
// ts is not overlap
|
||||
TSDBKEY* pFirst = taosArrayGet(pBlockScanInfo->delSkyline, 0);
|
||||
TSDBKEY* pLast = taosArrayGetLast(pBlockScanInfo->delSkyline);
|
||||
if (pFirst == NULL || pLast == NULL) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (pRecord->firstKey.key.ts > pLast->ts || pRecord->lastKey.key.ts < pFirst->ts) {
|
||||
return false;
|
||||
}
|
||||
|
@ -1335,6 +1383,10 @@ bool overlapWithDelSkylineWithoutVer(STableBlockScanInfo* pBlockScanInfo, const
|
|||
int32_t index = pBlockScanInfo->fileDelIndex;
|
||||
while (1) {
|
||||
TSDBKEY* p = taosArrayGet(pBlockScanInfo->delSkyline, index);
|
||||
if (p == NULL) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (p->ts > pRecord->firstKey.key.ts && index > 0) {
|
||||
index -= 1;
|
||||
} else { // find the first point that is smaller than the minKey.ts of dataBlock.
|
||||
|
|
|
@ -140,7 +140,7 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode
|
|||
|
||||
SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SSortMergeJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo);
|
||||
|
||||
SOperatorInfo* createHashJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SHashJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo);
|
||||
SOperatorInfo* createHashJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SHashJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo);
|
||||
|
||||
SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle);
|
||||
|
||||
|
@ -164,7 +164,7 @@ SOperatorInfo* createCountwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNo
|
|||
|
||||
SOperatorInfo* createGroupCacheOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SGroupCachePhysiNode* pPhyciNode, SExecTaskInfo* pTaskInfo);
|
||||
|
||||
SOperatorInfo* createDynQueryCtrlOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SDynQueryCtrlPhysiNode* pPhyciNode, SExecTaskInfo* pTaskInfo);
|
||||
SOperatorInfo* createDynQueryCtrlOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SDynQueryCtrlPhysiNode* pPhyciNode, SExecTaskInfo* pTaskInfo);
|
||||
|
||||
// clang-format on
|
||||
|
||||
|
|
|
@ -98,16 +98,17 @@ struct SExecTaskInfo {
|
|||
SQueryAutoQWorkerPoolCB* pWorkerCb;
|
||||
};
|
||||
|
||||
void buildTaskId(uint64_t taskId, uint64_t queryId, char* dst);
|
||||
SExecTaskInfo* doCreateTask(uint64_t queryId, uint64_t taskId, int32_t vgId, EOPTR_EXEC_MODEL model, SStorageAPI* pAPI);
|
||||
void doDestroyTask(SExecTaskInfo* pTaskInfo);
|
||||
bool isTaskKilled(void* pTaskInfo);
|
||||
void setTaskKilled(SExecTaskInfo* pTaskInfo, int32_t rspCode);
|
||||
void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status);
|
||||
int32_t createExecTaskInfo(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId,
|
||||
int32_t vgId, char* sql, EOPTR_EXEC_MODEL model);
|
||||
int32_t qAppendTaskStopInfo(SExecTaskInfo* pTaskInfo, SExchangeOpStopInfo* pInfo);
|
||||
SArray* getTableListInfo(const SExecTaskInfo* pTaskInfo);
|
||||
void buildTaskId(uint64_t taskId, uint64_t queryId, char* dst);
|
||||
int32_t doCreateTask(uint64_t queryId, uint64_t taskId, int32_t vgId, EOPTR_EXEC_MODEL model, SStorageAPI* pAPI,
|
||||
SExecTaskInfo** pTaskInfo);
|
||||
void doDestroyTask(SExecTaskInfo* pTaskInfo);
|
||||
bool isTaskKilled(void* pTaskInfo);
|
||||
void setTaskKilled(SExecTaskInfo* pTaskInfo, int32_t rspCode);
|
||||
void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status);
|
||||
int32_t createExecTaskInfo(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId,
|
||||
int32_t vgId, char* sql, EOPTR_EXEC_MODEL model);
|
||||
int32_t qAppendTaskStopInfo(SExecTaskInfo* pTaskInfo, SExchangeOpStopInfo* pInfo);
|
||||
int32_t getTableListInfo(const SExecTaskInfo* pTaskInfo, SArray** pList);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -77,9 +77,9 @@ typedef int32_t (*_sort_merge_compar_fn_t)(const void* p1, const void* p2, void*
|
|||
* @param sortBufSize sort memory buf size, for check if heap sort is applicable
|
||||
* @return
|
||||
*/
|
||||
SSortHandle* tsortCreateSortHandle(SArray* pOrderInfo, int32_t type, int32_t pageSize, int32_t numOfPages,
|
||||
int32_t tsortCreateSortHandle(SArray* pOrderInfo, int32_t type, int32_t pageSize, int32_t numOfPages,
|
||||
SSDataBlock* pBlock, const char* idstr, uint64_t pqMaxRows, uint32_t pqMaxTupleLength,
|
||||
uint32_t pqSortBufSize);
|
||||
uint32_t pqSortBufSize, SSortHandle** pHandle);
|
||||
|
||||
void tsortSetForceUsePQSort(SSortHandle* pHandle);
|
||||
|
||||
|
@ -101,7 +101,7 @@ int32_t tsortOpen(SSortHandle* pHandle);
|
|||
* @param pHandle
|
||||
* @return
|
||||
*/
|
||||
int32_t tsortClose(SSortHandle* pHandle);
|
||||
void tsortClose(SSortHandle* pHandle);
|
||||
|
||||
/**
|
||||
*
|
||||
|
@ -116,16 +116,17 @@ int32_t tsortSetFetchRawDataFp(SSortHandle* pHandle, _sort_fetch_block_fn_t fetc
|
|||
* @param fp
|
||||
* @return
|
||||
*/
|
||||
int32_t tsortSetComparFp(SSortHandle* pHandle, _sort_merge_compar_fn_t fp);
|
||||
void tsortSetComparFp(SSortHandle* pHandle, _sort_merge_compar_fn_t fp);
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
void tsortSetMergeLimit(SSortHandle* pHandle, int64_t mergeLimit);
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
int32_t tsortSetCompareGroupId(SSortHandle* pHandle, bool compareGroupId);
|
||||
void tsortSetCompareGroupId(SSortHandle* pHandle, bool compareGroupId);
|
||||
|
||||
/**
|
||||
*
|
||||
|
@ -140,7 +141,7 @@ int32_t tsortAddSource(SSortHandle* pSortHandle, void* pSource);
|
|||
* @param pHandle
|
||||
* @return
|
||||
*/
|
||||
STupleHandle* tsortNextTuple(SSortHandle* pHandle);
|
||||
int32_t tsortNextTuple(SSortHandle* pHandle, STupleHandle** pTupleHandle);
|
||||
|
||||
/**
|
||||
*
|
||||
|
@ -156,7 +157,7 @@ bool tsortIsNullVal(STupleHandle* pVHandle, int32_t colId);
|
|||
* @param colId
|
||||
* @return
|
||||
*/
|
||||
void* tsortGetValue(STupleHandle* pVHandle, int32_t colId);
|
||||
void tsortGetValue(STupleHandle* pVHandle, int32_t colId, void** pVal);
|
||||
|
||||
/**
|
||||
*
|
||||
|
@ -164,13 +165,13 @@ void* tsortGetValue(STupleHandle* pVHandle, int32_t colId);
|
|||
* @return
|
||||
*/
|
||||
uint64_t tsortGetGroupId(STupleHandle* pVHandle);
|
||||
void* tsortGetBlockInfo(STupleHandle* pVHandle);
|
||||
void tsortGetBlockInfo(STupleHandle* pVHandle, SDataBlockInfo* pInfo);
|
||||
/**
|
||||
*
|
||||
* @param pSortHandle
|
||||
* @return
|
||||
*/
|
||||
SSDataBlock* tsortGetSortedDataBlock(const SSortHandle* pSortHandle);
|
||||
int32_t tsortGetSortedDataBlock(const SSortHandle* pSortHandle, SSDataBlock** pBlock);
|
||||
|
||||
/**
|
||||
* return the sort execution information.
|
||||
|
@ -197,7 +198,8 @@ void tsortSetAbortCheckFn(SSortHandle* pHandle, bool (*checkFn)(void* param), vo
|
|||
|
||||
int32_t tsortSetSortByRowId(SSortHandle* pHandle, int32_t extRowsSize);
|
||||
|
||||
void tsortAppendTupleToBlock(SSortHandle* pHandle, SSDataBlock* pBlock, STupleHandle* pTupleHandle);
|
||||
int32_t tsortAppendTupleToBlock(SSortHandle* pHandle, SSDataBlock* pBlock, STupleHandle* pTupleHandle);
|
||||
|
||||
/**
|
||||
* @brief comp the tuple with keyBuf, if not equal, new keys will be built in keyBuf, newLen will be stored in keyLen
|
||||
* @param [in] pSortCols cols to comp and build
|
||||
|
@ -213,8 +215,8 @@ int32_t tsortCompAndBuildKeys(const SArray* pSortCols, char* keyBuf, int32_t* ke
|
|||
*/
|
||||
void tsortSetMergeLimitReachedFp(SSortHandle* pHandle, void (*mergeLimitReached)(uint64_t tableUid, void* param), void* param);
|
||||
|
||||
int tsortComparBlockCell(SSDataBlock* pLeftBlock, SSDataBlock* pRightBlock,
|
||||
int32_t leftRowIndex, int32_t rightRowIndex, void* pOrder);
|
||||
int tsortComparBlockCell(SSDataBlock* pLeftBlock, SSDataBlock* pRightBlock, int32_t leftRowIndex, int32_t rightRowIndex,
|
||||
void* pOrder);
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -377,7 +377,8 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
|
|||
}
|
||||
}
|
||||
|
||||
pInfo->pLastrowReader = pReaderFn->closeReader(pInfo->pLastrowReader);
|
||||
pReaderFn->closeReader(pInfo->pLastrowReader);
|
||||
pInfo->pLastrowReader = NULL;
|
||||
setOperatorCompleted(pOperator);
|
||||
return NULL;
|
||||
}
|
||||
|
@ -396,7 +397,8 @@ void destroyCacheScanOperator(void* param) {
|
|||
tableListDestroy(pInfo->pTableList);
|
||||
|
||||
if (pInfo->pLastrowReader != NULL) {
|
||||
pInfo->pLastrowReader = pInfo->readHandle.api.cacheFn.closeReader(pInfo->pLastrowReader);
|
||||
pInfo->readHandle.api.cacheFn.closeReader(pInfo->pLastrowReader);
|
||||
pInfo->pLastrowReader = NULL;
|
||||
}
|
||||
|
||||
cleanupExprSupp(&pInfo->pseudoExprSup);
|
||||
|
|
|
@ -288,8 +288,10 @@ int32_t qSetSMAInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks,
|
|||
qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int32_t vgId, int32_t* numOfCols,
|
||||
uint64_t id) {
|
||||
if (msg == NULL) { // create raw scan
|
||||
SExecTaskInfo* pTaskInfo = doCreateTask(0, id, vgId, OPTR_EXEC_MODEL_QUEUE, &pReaderHandle->api);
|
||||
if (NULL == pTaskInfo) {
|
||||
SExecTaskInfo* pTaskInfo = NULL;
|
||||
|
||||
int32_t code = doCreateTask(0, id, vgId, OPTR_EXEC_MODEL_QUEUE, &pReaderHandle->api, &pTaskInfo);
|
||||
if (NULL == pTaskInfo || code != 0) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
|
@ -1304,9 +1306,9 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
|
|||
pScanInfo->scanTimes = 0;
|
||||
|
||||
if (pScanBaseInfo->dataReader == NULL) {
|
||||
int32_t code = pTaskInfo->storageAPI.tsdReader.tsdReaderOpen(
|
||||
pScanBaseInfo->readHandle.vnode, &pScanBaseInfo->cond, &keyInfo, 1, pScanInfo->pResBlock,
|
||||
(void**)&pScanBaseInfo->dataReader, id, NULL);
|
||||
code = pTaskInfo->storageAPI.tsdReader.tsdReaderOpen(pScanBaseInfo->readHandle.vnode, &pScanBaseInfo->cond,
|
||||
&keyInfo, 1, pScanInfo->pResBlock,
|
||||
(void**)&pScanBaseInfo->dataReader, id, NULL);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("prepare read tsdb snapshot failed, uid:%" PRId64 ", code:%s %s", pOffset->uid, tstrerror(code), id);
|
||||
terrno = code;
|
||||
|
@ -1450,7 +1452,13 @@ SArray* qGetQueriedTableListInfo(qTaskInfo_t tinfo) {
|
|||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
SExecTaskInfo* pTaskInfo = tinfo;
|
||||
SArray* plist = getTableListInfo(pTaskInfo);
|
||||
SArray* plist = NULL;
|
||||
|
||||
code = getTableListInfo(pTaskInfo, &plist);
|
||||
if (code || plist == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
// only extract table in the first elements
|
||||
STableListInfo* pTableListInfo = taosArrayGetP(plist, 0);
|
||||
|
@ -1502,11 +1510,21 @@ _end:
|
|||
}
|
||||
}
|
||||
|
||||
SArray* getTableListInfo(const SExecTaskInfo* pTaskInfo) {
|
||||
SArray* pArray = taosArrayInit(0, POINTER_BYTES);
|
||||
int32_t getTableListInfo(const SExecTaskInfo* pTaskInfo, SArray** pList) {
|
||||
if (pList == NULL) {
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
|
||||
SArray* pArray = taosArrayInit(0, POINTER_BYTES);
|
||||
if (pArray == NULL) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
SOperatorInfo* pOperator = pTaskInfo->pRoot;
|
||||
extractTableList(pArray, pOperator);
|
||||
return pArray;
|
||||
|
||||
*pList = pArray;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qStreamOperatorReleaseState(qTaskInfo_t tInfo) {
|
||||
|
|
|
@ -1067,7 +1067,12 @@ int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, SExecTaskInfo*
|
|||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
SArray* pInfoList = getTableListInfo(pTask);
|
||||
SArray* pInfoList = NULL;
|
||||
int32_t code = getTableListInfo(pTask, &pInfoList);
|
||||
if (code || pInfoList == NULL) {
|
||||
return code;
|
||||
}
|
||||
|
||||
STableListInfo* pTableListInfo = taosArrayGetP(pInfoList, 0);
|
||||
taosArrayDestroy(pInfoList);
|
||||
|
||||
|
|
|
@ -70,8 +70,12 @@ int32_t openSortMergeOperator(SOperatorInfo* pOperator) {
|
|||
|
||||
int32_t numOfBufPage = pSortMergeInfo->sortBufSize / pSortMergeInfo->bufPageSize;
|
||||
|
||||
pSortMergeInfo->pSortHandle = tsortCreateSortHandle(pSortMergeInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pSortMergeInfo->bufPageSize, numOfBufPage,
|
||||
pSortMergeInfo->pInputBlock, pTaskInfo->id.str, 0, 0, 0);
|
||||
pSortMergeInfo->pSortHandle = NULL;
|
||||
int32_t code = tsortCreateSortHandle(pSortMergeInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pSortMergeInfo->bufPageSize,
|
||||
numOfBufPage, pSortMergeInfo->pInputBlock, pTaskInfo->id.str, 0, 0, 0, &pSortMergeInfo->pSortHandle);
|
||||
if (code) {
|
||||
return code;
|
||||
}
|
||||
|
||||
tsortSetFetchRawDataFp(pSortMergeInfo->pSortHandle, sortMergeloadNextDataBlock, NULL, NULL);
|
||||
tsortSetCompareGroupId(pSortMergeInfo->pSortHandle, pInfo->groupMerge);
|
||||
|
@ -96,12 +100,17 @@ static void doGetSortedBlockData(SMultiwayMergeOperatorInfo* pInfo, SSortHandle*
|
|||
SSDataBlock* p, bool* newgroup) {
|
||||
SSortMergeInfo* pSortMergeInfo = &pInfo->sortMergeInfo;
|
||||
*newgroup = false;
|
||||
int32_t code = 0;
|
||||
|
||||
while (1) {
|
||||
STupleHandle* pTupleHandle = NULL;
|
||||
if (pInfo->groupMerge || pInfo->inputWithGroupId) {
|
||||
if (pSortMergeInfo->prefetchedTuple == NULL) {
|
||||
pTupleHandle = tsortNextTuple(pHandle);
|
||||
pTupleHandle = NULL;
|
||||
code = tsortNextTuple(pHandle, &pTupleHandle);
|
||||
if (code) {
|
||||
// todo handle error
|
||||
}
|
||||
} else {
|
||||
pTupleHandle = pSortMergeInfo->prefetchedTuple;
|
||||
pSortMergeInfo->prefetchedTuple = NULL;
|
||||
|
@ -112,11 +121,11 @@ static void doGetSortedBlockData(SMultiwayMergeOperatorInfo* pInfo, SSortHandle*
|
|||
}
|
||||
}
|
||||
} else {
|
||||
pTupleHandle = tsortNextTuple(pHandle);
|
||||
code = tsortNextTuple(pHandle, &pTupleHandle);
|
||||
pInfo->groupId = 0;
|
||||
}
|
||||
|
||||
if (pTupleHandle == NULL) {
|
||||
if (pTupleHandle == NULL || (code != 0)) {
|
||||
break;
|
||||
}
|
||||
|
||||
|
@ -159,8 +168,10 @@ SSDataBlock* doSortMerge(SOperatorInfo* pOperator) {
|
|||
blockDataCleanup(pDataBlock);
|
||||
|
||||
if (pSortMergeInfo->pIntermediateBlock == NULL) {
|
||||
pSortMergeInfo->pIntermediateBlock = tsortGetSortedDataBlock(pHandle);
|
||||
if (pSortMergeInfo->pIntermediateBlock == NULL) {
|
||||
pSortMergeInfo->pIntermediateBlock = NULL;
|
||||
|
||||
int32_t code = tsortGetSortedDataBlock(pHandle, &pSortMergeInfo->pIntermediateBlock);
|
||||
if (pSortMergeInfo->pIntermediateBlock == NULL || code != 0) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
|
|
|
@ -35,38 +35,51 @@
|
|||
|
||||
#define CLEAR_QUERY_STATUS(q, st) ((q)->status &= (~(st)))
|
||||
|
||||
SExecTaskInfo* doCreateTask(uint64_t queryId, uint64_t taskId, int32_t vgId, EOPTR_EXEC_MODEL model, SStorageAPI* pAPI) {
|
||||
SExecTaskInfo* pTaskInfo = taosMemoryCalloc(1, sizeof(SExecTaskInfo));
|
||||
int32_t doCreateTask(uint64_t queryId, uint64_t taskId, int32_t vgId, EOPTR_EXEC_MODEL model, SStorageAPI* pAPI,
|
||||
SExecTaskInfo** pTaskInfo) {
|
||||
if (pTaskInfo == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
|
||||
pTaskInfo->cost.created = taosGetTimestampUs();
|
||||
SExecTaskInfo* p = taosMemoryCalloc(1, sizeof(SExecTaskInfo));
|
||||
if (p == NULL) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
pTaskInfo->execModel = model;
|
||||
pTaskInfo->stopInfo.pStopInfo = taosArrayInit(4, sizeof(SExchangeOpStopInfo));
|
||||
pTaskInfo->pResultBlockList = taosArrayInit(128, POINTER_BYTES);
|
||||
pTaskInfo->storageAPI = *pAPI;
|
||||
setTaskStatus(p, TASK_NOT_COMPLETED);
|
||||
p->cost.created = taosGetTimestampUs();
|
||||
|
||||
taosInitRWLatch(&pTaskInfo->lock);
|
||||
p->execModel = model;
|
||||
p->stopInfo.pStopInfo = taosArrayInit(4, sizeof(SExchangeOpStopInfo));
|
||||
p->pResultBlockList = taosArrayInit(128, POINTER_BYTES);
|
||||
if (p->stopInfo.pStopInfo == NULL || p->pResultBlockList == NULL) {
|
||||
doDestroyTask(p);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
pTaskInfo->id.vgId = vgId;
|
||||
pTaskInfo->id.queryId = queryId;
|
||||
pTaskInfo->id.taskId = taskId;
|
||||
pTaskInfo->id.str = taosMemoryMalloc(64);
|
||||
buildTaskId(taskId, queryId, pTaskInfo->id.str);
|
||||
pTaskInfo->schemaInfos = taosArrayInit(1, sizeof(SSchemaInfo));
|
||||
|
||||
return pTaskInfo;
|
||||
p->storageAPI = *pAPI;
|
||||
taosInitRWLatch(&p->lock);
|
||||
|
||||
p->id.vgId = vgId;
|
||||
p->id.queryId = queryId;
|
||||
p->id.taskId = taskId;
|
||||
p->id.str = taosMemoryMalloc(64);
|
||||
buildTaskId(taskId, queryId, p->id.str);
|
||||
p->schemaInfos = taosArrayInit(1, sizeof(SSchemaInfo));
|
||||
if (p->id.str == NULL || p->schemaInfos == NULL) {
|
||||
doDestroyTask(p);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
*pTaskInfo = p;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
bool isTaskKilled(void* pTaskInfo) { return (0 != ((SExecTaskInfo*)pTaskInfo)->code); }
|
||||
|
||||
void setTaskKilled(SExecTaskInfo* pTaskInfo, int32_t rspCode) {
|
||||
pTaskInfo->code = rspCode;
|
||||
stopTableScanOperator(pTaskInfo->pRoot, pTaskInfo->id.str, &pTaskInfo->storageAPI);
|
||||
(void) stopTableScanOperator(pTaskInfo->pRoot, pTaskInfo->id.str, &pTaskInfo->storageAPI);
|
||||
}
|
||||
|
||||
void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status) {
|
||||
|
@ -81,10 +94,10 @@ void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status) {
|
|||
|
||||
int32_t createExecTaskInfo(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId,
|
||||
int32_t vgId, char* sql, EOPTR_EXEC_MODEL model) {
|
||||
*pTaskInfo = doCreateTask(pPlan->id.queryId, taskId, vgId, model, &pHandle->api);
|
||||
if (*pTaskInfo == NULL) {
|
||||
int32_t code = doCreateTask(pPlan->id.queryId, taskId, vgId, model, &pHandle->api, pTaskInfo);
|
||||
if (*pTaskInfo == NULL || code != 0) {
|
||||
taosMemoryFree(sql);
|
||||
return terrno;
|
||||
return code;
|
||||
}
|
||||
|
||||
if (pHandle) {
|
||||
|
@ -165,12 +178,10 @@ int32_t initQueriedTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNo
|
|||
}
|
||||
|
||||
pAPI->metaReaderFn.clearReader(&mr);
|
||||
|
||||
schemaInfo.qsw = extractQueriedColumnSchema(pScanNode);
|
||||
|
||||
taosArrayPush(pTaskInfo->schemaInfos, &schemaInfo);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
void* p = taosArrayPush(pTaskInfo->schemaInfos, &schemaInfo);
|
||||
return (p != NULL)? TSDB_CODE_SUCCESS:TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode) {
|
||||
|
|
|
@ -5145,8 +5145,13 @@ int32_t startDurationForGroupTableMergeScan(SOperatorInfo* pOperator) {
|
|||
pInfo->sortBufSize = 2048 * pInfo->bufPageSize;
|
||||
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
|
||||
|
||||
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_BLOCK_TS_MERGE, pInfo->bufPageSize, numOfBufPage,
|
||||
pInfo->pSortInputBlock, pTaskInfo->id.str, 0, 0, 0);
|
||||
pInfo->pSortHandle = NULL;
|
||||
code = tsortCreateSortHandle(pInfo->pSortInfo, SORT_BLOCK_TS_MERGE, pInfo->bufPageSize, numOfBufPage,
|
||||
pInfo->pSortInputBlock, pTaskInfo->id.str, 0, 0, 0, &pInfo->pSortHandle);
|
||||
if (code) {
|
||||
return code;
|
||||
}
|
||||
|
||||
if (pInfo->bSortRowId && numOfTable != 1) {
|
||||
int32_t memSize = 512 * 1024 * 1024;
|
||||
code = tsortSetSortByRowId(pInfo->pSortHandle, memSize);
|
||||
|
@ -5280,10 +5285,12 @@ SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, SSDataBlock*
|
|||
STupleHandle* pTupleHandle = NULL;
|
||||
while (1) {
|
||||
while (1) {
|
||||
pTupleHandle = tsortNextTuple(pHandle);
|
||||
if (pTupleHandle == NULL) {
|
||||
pTupleHandle = NULL;
|
||||
int32_t code = tsortNextTuple(pHandle, &pTupleHandle);
|
||||
if (pTupleHandle == NULL || code != 0) {
|
||||
break;
|
||||
}
|
||||
|
||||
tsortAppendTupleToBlock(pInfo->pSortHandle, pResBlock, pTupleHandle);
|
||||
if (pResBlock->info.rows >= capacity) {
|
||||
break;
|
||||
|
|
|
@ -157,7 +157,8 @@ void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle) {
|
|||
if (isNull) {
|
||||
colDataSetNULL(pColInfo, pBlock->info.rows);
|
||||
} else {
|
||||
char* pData = tsortGetValue(pTupleHandle, i);
|
||||
char* pData = NULL;
|
||||
tsortGetValue(pTupleHandle, i, (void**) &pData);
|
||||
if (pData != NULL) {
|
||||
colDataSetVal(pColInfo, pBlock->info.rows, pData, false);
|
||||
}
|
||||
|
@ -165,7 +166,11 @@ void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle) {
|
|||
}
|
||||
|
||||
pBlock->info.dataLoad = 1;
|
||||
pBlock->info.scanFlag = ((SDataBlockInfo*)tsortGetBlockInfo(pTupleHandle))->scanFlag;
|
||||
|
||||
SDataBlockInfo info = {0};
|
||||
tsortGetBlockInfo(pTupleHandle, &info);
|
||||
|
||||
pBlock->info.scanFlag = info.scanFlag;
|
||||
pBlock->info.rows += 1;
|
||||
}
|
||||
|
||||
|
@ -176,9 +181,13 @@ void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle) {
|
|||
* @retval NULL if no more tuples
|
||||
*/
|
||||
static STupleHandle* nextTupleWithGroupId(SSortHandle* pHandle, SSortOperatorInfo* pInfo, SSDataBlock* pBlock) {
|
||||
int32_t code = 0;
|
||||
STupleHandle* retTuple = pInfo->pGroupIdCalc->pSavedTuple;
|
||||
if (!retTuple) {
|
||||
retTuple = tsortNextTuple(pHandle);
|
||||
code = tsortNextTuple(pHandle, &retTuple);
|
||||
if (code) {
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
|
||||
if (retTuple) {
|
||||
|
@ -219,8 +228,9 @@ SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, i
|
|||
SSortOperatorInfo* pInfo) {
|
||||
blockDataCleanup(pDataBlock);
|
||||
|
||||
SSDataBlock* p = tsortGetSortedDataBlock(pHandle);
|
||||
if (p == NULL) {
|
||||
SSDataBlock* p = NULL;
|
||||
int32_t code = tsortGetSortedDataBlock(pHandle, &p);
|
||||
if (p == NULL || (code != 0)) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
@ -231,9 +241,9 @@ SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, i
|
|||
if (pInfo->pGroupIdCalc) {
|
||||
pTupleHandle = nextTupleWithGroupId(pHandle, pInfo, p);
|
||||
} else {
|
||||
pTupleHandle = tsortNextTuple(pHandle);
|
||||
code = tsortNextTuple(pHandle, &pTupleHandle);
|
||||
}
|
||||
if (pTupleHandle == NULL) {
|
||||
if (pTupleHandle == NULL || code != 0) {
|
||||
break;
|
||||
}
|
||||
|
||||
|
@ -295,8 +305,12 @@ int32_t doOpenSortOperator(SOperatorInfo* pOperator) {
|
|||
|
||||
pInfo->startTs = taosGetTimestampUs();
|
||||
// pInfo->binfo.pRes is not equalled to the input datablock.
|
||||
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_SINGLESOURCE_SORT, -1, -1, NULL, pTaskInfo->id.str,
|
||||
pInfo->maxRows, pInfo->maxTupleLength, tsPQSortMemThreshold * 1024 * 1024);
|
||||
pInfo->pSortHandle = NULL;
|
||||
int32_t code = tsortCreateSortHandle(pInfo->pSortInfo, SORT_SINGLESOURCE_SORT, -1, -1, NULL, pTaskInfo->id.str,
|
||||
pInfo->maxRows, pInfo->maxTupleLength, tsPQSortMemThreshold * 1024 * 1024, &pInfo->pSortHandle);
|
||||
if (code) {
|
||||
return code;
|
||||
}
|
||||
|
||||
tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock, applyScalarFunction, pOperator);
|
||||
|
||||
|
@ -305,8 +319,7 @@ int32_t doOpenSortOperator(SOperatorInfo* pOperator) {
|
|||
ps->onlyRef = true;
|
||||
tsortAddSource(pInfo->pSortHandle, ps);
|
||||
|
||||
int32_t code = tsortOpen(pInfo->pSortHandle);
|
||||
|
||||
code = tsortOpen(pInfo->pSortHandle);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
T_LONG_JMP(pTaskInfo->env, terrno);
|
||||
}
|
||||
|
@ -434,16 +447,18 @@ SSDataBlock* getGroupSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlo
|
|||
blockDataCleanup(pDataBlock);
|
||||
blockDataEnsureCapacity(pDataBlock, capacity);
|
||||
|
||||
SSDataBlock* p = tsortGetSortedDataBlock(pHandle);
|
||||
if (p == NULL) {
|
||||
SSDataBlock* p = NULL;
|
||||
int32_t code = tsortGetSortedDataBlock(pHandle, &p);
|
||||
if (p == NULL || (code != 0)) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
blockDataEnsureCapacity(p, capacity);
|
||||
|
||||
while (1) {
|
||||
STupleHandle* pTupleHandle = tsortNextTuple(pHandle);
|
||||
if (pTupleHandle == NULL) {
|
||||
STupleHandle* pTupleHandle = NULL;
|
||||
code = tsortNextTuple(pHandle, &pTupleHandle);
|
||||
if (pTupleHandle == NULL || code != 0) {
|
||||
break;
|
||||
}
|
||||
|
||||
|
@ -508,8 +523,13 @@ int32_t beginSortGroup(SOperatorInfo* pOperator) {
|
|||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
|
||||
// pInfo->binfo.pRes is not equalled to the input datablock.
|
||||
pInfo->pCurrSortHandle =
|
||||
tsortCreateSortHandle(pInfo->pSortInfo, SORT_SINGLESOURCE_SORT, -1, -1, NULL, pTaskInfo->id.str, 0, 0, 0);
|
||||
pInfo->pCurrSortHandle = NULL;
|
||||
|
||||
int32_t code = tsortCreateSortHandle(pInfo->pSortInfo, SORT_SINGLESOURCE_SORT, -1, -1, NULL, pTaskInfo->id.str, 0, 0,
|
||||
0, &pInfo->pCurrSortHandle);
|
||||
if (code) {
|
||||
return code;
|
||||
}
|
||||
|
||||
tsortSetFetchRawDataFp(pInfo->pCurrSortHandle, fetchNextGroupSortDataBlock, applyScalarFunction, pOperator);
|
||||
|
||||
|
@ -521,13 +541,12 @@ int32_t beginSortGroup(SOperatorInfo* pOperator) {
|
|||
ps->onlyRef = false;
|
||||
tsortAddSource(pInfo->pCurrSortHandle, ps);
|
||||
|
||||
int32_t code = tsortOpen(pInfo->pCurrSortHandle);
|
||||
|
||||
code = tsortOpen(pInfo->pCurrSortHandle);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
T_LONG_JMP(pTaskInfo->env, terrno);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t finishSortGroup(SOperatorInfo* pOperator) {
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -36,7 +36,7 @@ static void setCheckDownstreamReqInfo(SStreamTaskCheckReq* pReq, int64_t reqId,
|
|||
static void getCheckRspStatus(STaskCheckInfo* pInfo, int64_t el, int32_t* numOfReady, int32_t* numOfFault,
|
||||
int32_t* numOfNotRsp, SArray* pTimeoutList, SArray* pNotReadyList, const char* id);
|
||||
static int32_t addDownstreamFailedStatusResultAsync(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int32_t taskId);
|
||||
static SDownstreamStatusInfo* findCheckRspStatus(STaskCheckInfo* pInfo, int32_t taskId);
|
||||
static void findCheckRspStatus(STaskCheckInfo* pInfo, int32_t taskId, SDownstreamStatusInfo** pStatusInfo);
|
||||
|
||||
int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_t vgId, int64_t stage,
|
||||
int64_t* oldStage) {
|
||||
|
@ -402,22 +402,26 @@ void streamTaskInitTaskCheckInfo(STaskCheckInfo* pInfo, STaskOutputInfo* pOutput
|
|||
pInfo->stopCheckProcess = 0;
|
||||
}
|
||||
|
||||
SDownstreamStatusInfo* findCheckRspStatus(STaskCheckInfo* pInfo, int32_t taskId) {
|
||||
void findCheckRspStatus(STaskCheckInfo* pInfo, int32_t taskId, SDownstreamStatusInfo** pStatusInfo) {
|
||||
if (pStatusInfo == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
*pStatusInfo = NULL;
|
||||
for (int32_t j = 0; j < taosArrayGetSize(pInfo->pList); ++j) {
|
||||
SDownstreamStatusInfo* p = taosArrayGet(pInfo->pList, j);
|
||||
if (p->taskId == taskId) {
|
||||
return p;
|
||||
*pStatusInfo = p;
|
||||
}
|
||||
}
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
int32_t streamTaskUpdateCheckInfo(STaskCheckInfo* pInfo, int32_t taskId, int32_t status, int64_t rspTs, int64_t reqId,
|
||||
int32_t* pNotReady, const char* id) {
|
||||
streamMutexLock(&pInfo->checkInfoLock);
|
||||
SDownstreamStatusInfo* p = NULL;
|
||||
|
||||
SDownstreamStatusInfo* p = findCheckRspStatus(pInfo, taskId);
|
||||
streamMutexLock(&pInfo->checkInfoLock);
|
||||
findCheckRspStatus(pInfo, taskId, &p);
|
||||
if (p != NULL) {
|
||||
if (reqId != p->reqId) {
|
||||
stError("s-task:%s reqId:0x%" PRIx64 " expected:0x%" PRIx64
|
||||
|
@ -495,7 +499,8 @@ void streamTaskAddReqInfo(STaskCheckInfo* pInfo, int64_t reqId, int32_t taskId,
|
|||
SDownstreamStatusInfo info = {.taskId = taskId, .status = -1, .vgId = vgId, .reqId = reqId, .rspTs = 0};
|
||||
streamMutexLock(&pInfo->checkInfoLock);
|
||||
|
||||
SDownstreamStatusInfo* p = findCheckRspStatus(pInfo, taskId);
|
||||
SDownstreamStatusInfo* p = NULL;
|
||||
findCheckRspStatus(pInfo, taskId, &p);
|
||||
if (p != NULL) {
|
||||
stDebug("s-task:%s check info to task:0x%x already sent", id, taskId);
|
||||
streamMutexUnlock(&pInfo->checkInfoLock);
|
||||
|
@ -598,7 +603,8 @@ void handleTimeoutDownstreamTasks(SStreamTask* pTask, SArray* pTimeoutList) {
|
|||
for (int32_t i = 0; i < numOfTimeout; ++i) {
|
||||
int32_t taskId = *(int32_t*)taosArrayGet(pTimeoutList, i);
|
||||
|
||||
SDownstreamStatusInfo* p = findCheckRspStatus(pInfo, taskId);
|
||||
SDownstreamStatusInfo* p = NULL;
|
||||
findCheckRspStatus(pInfo, taskId, &p);
|
||||
if (p != NULL) {
|
||||
ASSERT(p->status == -1 && p->rspTs == 0);
|
||||
doSendCheckMsg(pTask, p);
|
||||
|
@ -613,7 +619,8 @@ void handleTimeoutDownstreamTasks(SStreamTask* pTask, SArray* pTimeoutList) {
|
|||
|
||||
for (int32_t i = 0; i < numOfTimeout; ++i) {
|
||||
int32_t taskId = *(int32_t*)taosArrayGet(pTimeoutList, i);
|
||||
SDownstreamStatusInfo* p = findCheckRspStatus(pInfo, taskId);
|
||||
SDownstreamStatusInfo* p = NULL;
|
||||
findCheckRspStatus(pInfo, taskId, &p);
|
||||
if (p != NULL) {
|
||||
addIntoNodeUpdateList(pTask, p->vgId);
|
||||
stDebug("s-task:%s vgId:%d downstream task:0x%x (vgId:%d) timeout more than 100sec, add into nodeUpate list",
|
||||
|
@ -640,7 +647,8 @@ void handleNotReadyDownstreamTask(SStreamTask* pTask, SArray* pNotReadyList) {
|
|||
for (int32_t i = 0; i < numOfNotReady; ++i) {
|
||||
int32_t taskId = *(int32_t*)taosArrayGet(pNotReadyList, i);
|
||||
|
||||
SDownstreamStatusInfo* p = findCheckRspStatus(pInfo, taskId);
|
||||
SDownstreamStatusInfo* p = NULL;
|
||||
findCheckRspStatus(pInfo, taskId, &p);
|
||||
if (p != NULL) {
|
||||
p->rspTs = 0;
|
||||
p->status = -1;
|
||||
|
|
|
@ -176,6 +176,10 @@ int32_t continueDispatchCheckpointTriggerBlock(SStreamDataBlock* pBlock, SStream
|
|||
|
||||
int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock* pBlock) {
|
||||
SSDataBlock* pDataBlock = taosArrayGet(pBlock->blocks, 0);
|
||||
if (pDataBlock == NULL) {
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
|
||||
int64_t checkpointId = pDataBlock->info.version;
|
||||
int32_t transId = pDataBlock->info.window.skey;
|
||||
const char* id = pTask->id.idStr;
|
||||
|
@ -248,6 +252,10 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
|
|||
// check if already recv or not, and duplicated checkpoint-trigger msg recv, discard it
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pActiveInfo->pReadyMsgList); ++i) {
|
||||
STaskCheckpointReadyInfo* p = taosArrayGet(pActiveInfo->pReadyMsgList, i);
|
||||
if (p == NULL) {
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
|
||||
if (p->upstreamTaskId == pBlock->srcTaskId) {
|
||||
ASSERT(p->checkpointId == checkpointId);
|
||||
stWarn("s-task:%s repeatly recv checkpoint-source msg from task:0x%x vgId:%d, checkpointId:%" PRId64
|
||||
|
@ -381,6 +389,10 @@ int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask, int64_t checkpointId
|
|||
int32_t size = taosArrayGetSize(pInfo->pCheckpointReadyRecvList);
|
||||
for (int32_t i = 0; i < size; ++i) {
|
||||
STaskDownstreamReadyInfo* p = taosArrayGet(pInfo->pCheckpointReadyRecvList, i);
|
||||
if (p == NULL) {
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
|
||||
if (p->downstreamTaskId == downstreamTaskId) {
|
||||
received = true;
|
||||
break;
|
||||
|
@ -420,6 +432,10 @@ int32_t streamTaskProcessCheckpointReadyRsp(SStreamTask* pTask, int32_t upstream
|
|||
streamMutexLock(&pInfo->lock);
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pReadyMsgList); ++i) {
|
||||
STaskCheckpointReadyInfo* pReadyInfo = taosArrayGet(pInfo->pReadyMsgList, i);
|
||||
if (pReadyInfo == NULL) {
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
|
||||
if (pReadyInfo->upstreamTaskId == upstreamTaskId && pReadyInfo->checkpointId == checkpointId) {
|
||||
pReadyInfo->sendCompleted = 1;
|
||||
stDebug("s-task:%s send checkpoint-ready msg to upstream:0x%x confirmed, checkpointId:%" PRId64 " ts:%" PRId64,
|
||||
|
@ -430,6 +446,10 @@ int32_t streamTaskProcessCheckpointReadyRsp(SStreamTask* pTask, int32_t upstream
|
|||
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pReadyMsgList); ++i) {
|
||||
STaskCheckpointReadyInfo* pReadyInfo = taosArrayGet(pInfo->pReadyMsgList, i);
|
||||
if (pReadyInfo == NULL) {
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
|
||||
if (pReadyInfo->sendCompleted == 1) {
|
||||
numOfConfirmed += 1;
|
||||
}
|
||||
|
@ -819,6 +839,10 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) {
|
|||
bool recved = false;
|
||||
for (int32_t j = 0; j < taosArrayGetSize(pActiveInfo->pReadyMsgList); ++j) {
|
||||
STaskCheckpointReadyInfo* pReady = taosArrayGet(pActiveInfo->pReadyMsgList, j);
|
||||
if (pReady == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (pInfo->nodeId == pReady->upstreamNodeId) {
|
||||
recved = true;
|
||||
break;
|
||||
|
@ -867,6 +891,9 @@ int32_t doSendRetrieveTriggerMsg(SStreamTask* pTask, SArray* pNotSendList) {
|
|||
|
||||
for (int32_t i = 0; i < size; i++) {
|
||||
SStreamUpstreamEpInfo* pUpstreamTask = taosArrayGet(pNotSendList, i);
|
||||
if (pUpstreamTask == NULL) {
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
|
||||
SRetrieveChkptTriggerReq* pReq = rpcMallocCont(sizeof(SRetrieveChkptTriggerReq));
|
||||
if (pReq == NULL) {
|
||||
|
@ -917,6 +944,10 @@ bool streamTaskAlreadySendTrigger(SStreamTask* pTask, int32_t downstreamNodeId)
|
|||
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pDispatchTriggerList); ++i) {
|
||||
STaskTriggerSendInfo* pSendInfo = taosArrayGet(pInfo->pDispatchTriggerList, i);
|
||||
if (pSendInfo == NULL) {
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
|
||||
if (pSendInfo->nodeId != downstreamNodeId) {
|
||||
continue;
|
||||
}
|
||||
|
@ -974,6 +1005,9 @@ void streamTaskInitTriggerDispatchInfo(SStreamTask* pTask) {
|
|||
} else {
|
||||
for (int32_t i = 0; i < streamTaskGetNumOfDownstream(pTask); ++i) {
|
||||
SVgroupInfo* pVgInfo = taosArrayGet(pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos, i);
|
||||
if (pVgInfo == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
STaskTriggerSendInfo p = {.sendTs = now, .recved = false, .nodeId = pVgInfo->vgId, .taskId = pVgInfo->taskId};
|
||||
void* px = taosArrayPush(pInfo->pDispatchTriggerList, &p);
|
||||
|
@ -993,6 +1027,10 @@ int32_t streamTaskGetNumOfConfirmed(SStreamTask* pTask) {
|
|||
streamMutexLock(&pInfo->lock);
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pDispatchTriggerList); ++i) {
|
||||
STaskTriggerSendInfo* p = taosArrayGet(pInfo->pDispatchTriggerList, i);
|
||||
if (p == NULL) {
|
||||
return num;
|
||||
}
|
||||
|
||||
if (p->recved) {
|
||||
num++;
|
||||
}
|
||||
|
@ -1009,6 +1047,10 @@ void streamTaskSetTriggerDispatchConfirmed(SStreamTask* pTask, int32_t vgId) {
|
|||
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pDispatchTriggerList); ++i) {
|
||||
STaskTriggerSendInfo* p = taosArrayGet(pInfo->pDispatchTriggerList, i);
|
||||
if (p == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (p->nodeId == vgId) {
|
||||
ASSERT(p->recved == false);
|
||||
|
||||
|
|
|
@ -539,9 +539,11 @@ void streamMetaClear(SStreamMeta* pMeta) {
|
|||
streamMetaReleaseTask(pMeta, p);
|
||||
}
|
||||
|
||||
int32_t code = taosRemoveRef(streamBackendId, pMeta->streamBackendRid);
|
||||
if (code) {
|
||||
stError("vgId:%d remove stream backend Ref failed, rid:%"PRId64, pMeta->vgId, pMeta->streamBackendRid);
|
||||
if (pMeta->streamBackendRid != 0) {
|
||||
int32_t code = taosRemoveRef(streamBackendId, pMeta->streamBackendRid);
|
||||
if (code) {
|
||||
stError("vgId:%d remove stream backend Ref failed, rid:%" PRId64, pMeta->vgId, pMeta->streamBackendRid);
|
||||
}
|
||||
}
|
||||
|
||||
taosHashClear(pMeta->pTasksMap);
|
||||
|
@ -726,7 +728,7 @@ int32_t streamMetaAcquireTaskNoLock(SStreamMeta* pMeta, int64_t streamId, int32_
|
|||
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
|
||||
if (ppTask == NULL || streamTaskShouldStop(*ppTask)) {
|
||||
*pTask = NULL;
|
||||
return TSDB_CODE_FAILED;
|
||||
return TSDB_CODE_STREAM_TASK_NOT_EXIST;
|
||||
}
|
||||
|
||||
int32_t ref = atomic_add_fetch_32(&(*ppTask)->refCnt, 1);
|
||||
|
|
|
@ -104,10 +104,10 @@ void streamTaskResumeHelper(void* param, void* tmrId) {
|
|||
}
|
||||
|
||||
int32_t code = streamTaskSchedTask(pTask->pMsgCb, pTask->info.nodeId, pId->streamId, pId->taskId, STREAM_EXEC_T_RESUME_TASK);
|
||||
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
||||
if (code) {
|
||||
stError("s-task:%s sched task failed, code:%s", pTask->id.idStr, strerror(code));
|
||||
stError("s-task:%s sched task failed, code:%s, ref:%d", pId->idStr, strerror(code), ref);
|
||||
} else {
|
||||
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
||||
stDebug("trigger to resume s-task:%s after being idled for %dms, ref:%d", pId->idStr, pTask->status.schedIdleTime,
|
||||
ref);
|
||||
|
||||
|
|
|
@ -16,6 +16,7 @@ endi
|
|||
|
||||
sql select last(f1) from tb1
|
||||
if $rows != 1 then
|
||||
print expect 1, actual $rows
|
||||
return -1
|
||||
endi
|
||||
if $data00 != 6 then
|
||||
|
|
|
@ -67,11 +67,15 @@ class TDTestCase:
|
|||
tdSql.execute(f'use {self.dbname}')
|
||||
tdSql.execute(f'create stable {self.stbname} (ts timestamp,c0 int) tags(t0 int)')
|
||||
tdSql.execute(f'create stream `{self.streamname}` into `{self.streamtb}` as select count(*) from {self.stbname} interval(10s);')
|
||||
|
||||
time.sleep(15)
|
||||
tdSql.query('show streams')
|
||||
tdSql.checkEqual(tdSql.queryResult[0][0],self.streamname)
|
||||
tdSql.execute(f'drop stream {self.streamname}')
|
||||
tdSql.execute(f'drop stable {self.streamtb}')
|
||||
tdSql.execute(f'create stream {self.streamname} into `{self.streamtb}` as select count(*) from {self.stbname} interval(10s);')
|
||||
|
||||
time.sleep(10)
|
||||
tdSql.query('show streams')
|
||||
tdSql.checkEqual(tdSql.queryResult[0][0],self.streamname)
|
||||
tdSql.execute(f'drop stream `{self.streamname}`')
|
||||
|
|
|
@ -123,8 +123,8 @@ class TDTestCase:
|
|||
{
|
||||
"name": "checkpointInterval",
|
||||
"alias": "tsStreamCheckpointInterval",
|
||||
"values": [60, 1000, 1200],
|
||||
"except_values": [59, 1201]
|
||||
"values": [60, 1000, 1800],
|
||||
"except_values": [59, 1801]
|
||||
},
|
||||
{
|
||||
"name": "trimVDbIntervalSec",
|
||||
|
|
|
@ -146,11 +146,15 @@ class TDTestCase:
|
|||
tdSql.execute(f'create table {stbname} (ts timestamp,c0 int) tags(t0 int)')
|
||||
tdSql.execute(f'create table tb using {stbname} tags(1)')
|
||||
tdSql.execute(f'create stream {stream_name} trigger at_once ignore expired 0 into stb as select * from {self.dbname}.{stbname} partition by tbname')
|
||||
time.sleep(5)
|
||||
|
||||
tdSql.query(f'select * from information_schema.ins_streams where stream_name = "{stream_name}"')
|
||||
print(tdSql.queryResult)
|
||||
tdSql.checkEqual(tdSql.queryResult[0][4],f'create stream {stream_name} trigger at_once ignore expired 0 into stb as select * from {self.dbname}.{stbname} partition by tbname')
|
||||
tdSql.execute(f'drop stream {stream_name}')
|
||||
tdSql.execute(f'create stream {stream_name} trigger at_once ignore expired 0 into stb1 as select * from tb')
|
||||
time.sleep(5)
|
||||
|
||||
tdSql.query(f'select * from information_schema.ins_streams where stream_name = "{stream_name}"')
|
||||
tdSql.checkEqual(tdSql.queryResult[0][4],f'create stream {stream_name} trigger at_once ignore expired 0 into stb1 as select * from tb')
|
||||
tdSql.execute(f'drop database {self.dbname}')
|
||||
|
|
Loading…
Reference in New Issue