Merge branch '3.0' into szhou/tms-wc/save-row
This commit is contained in:
commit
afa70e5aed
|
@ -306,7 +306,7 @@ def pre_test_build_win() {
|
||||||
cd %WIN_CONNECTOR_ROOT%
|
cd %WIN_CONNECTOR_ROOT%
|
||||||
python.exe -m pip install --upgrade pip
|
python.exe -m pip install --upgrade pip
|
||||||
python -m pip uninstall taospy -y
|
python -m pip uninstall taospy -y
|
||||||
python -m pip install taospy==2.7.12
|
python -m pip install taospy==2.7.13
|
||||||
python -m pip uninstall taos-ws-py -y
|
python -m pip uninstall taos-ws-py -y
|
||||||
python -m pip install taos-ws-py==0.3.1
|
python -m pip install taos-ws-py==0.3.1
|
||||||
xcopy /e/y/i/f %WIN_INTERNAL_ROOT%\\debug\\build\\lib\\taos.dll C:\\Windows\\System32
|
xcopy /e/y/i/f %WIN_INTERNAL_ROOT%\\debug\\build\\lib\\taos.dll C:\\Windows\\System32
|
||||||
|
|
|
@ -3331,7 +3331,7 @@ typedef struct {
|
||||||
SMsgHead head;
|
SMsgHead head;
|
||||||
int64_t streamId;
|
int64_t streamId;
|
||||||
int32_t taskId;
|
int32_t taskId;
|
||||||
} SVPauseStreamTaskReq, SVResetStreamTaskReq, SVDropHTaskReq;
|
} SVPauseStreamTaskReq, SVResetStreamTaskReq;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int8_t reserved;
|
int8_t reserved;
|
||||||
|
|
|
@ -406,10 +406,6 @@ int32_t stmtGetFromCache(STscStmt* pStmt) {
|
||||||
|
|
||||||
if (NULL == pStmt->sql.pTableCache || taosHashGetSize(pStmt->sql.pTableCache) <= 0) {
|
if (NULL == pStmt->sql.pTableCache || taosHashGetSize(pStmt->sql.pTableCache) <= 0) {
|
||||||
if (pStmt->bInfo.inExecCache) {
|
if (pStmt->bInfo.inExecCache) {
|
||||||
if (ASSERT(taosHashGetSize(pStmt->exec.pBlockHash) == 1)) {
|
|
||||||
tscError("stmtGetFromCache error");
|
|
||||||
return TSDB_CODE_TSC_STMT_CACHE_ERROR;
|
|
||||||
}
|
|
||||||
pStmt->bInfo.needParse = false;
|
pStmt->bInfo.needParse = false;
|
||||||
tscDebug("reuse stmt block for tb %s in execBlock", pStmt->bInfo.tbFName);
|
tscDebug("reuse stmt block for tb %s in execBlock", pStmt->bInfo.tbFName);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
|
|
@ -69,12 +69,6 @@ typedef struct SNodeEntry {
|
||||||
int64_t hbTimestamp; // second
|
int64_t hbTimestamp; // second
|
||||||
} SNodeEntry;
|
} SNodeEntry;
|
||||||
|
|
||||||
typedef struct SFailedCheckpointInfo {
|
|
||||||
int64_t streamUid;
|
|
||||||
int64_t checkpointId;
|
|
||||||
int32_t transId;
|
|
||||||
} SFailedCheckpointInfo;
|
|
||||||
|
|
||||||
#define MND_STREAM_CREATE_NAME "stream-create"
|
#define MND_STREAM_CREATE_NAME "stream-create"
|
||||||
#define MND_STREAM_CHECKPOINT_NAME "stream-checkpoint"
|
#define MND_STREAM_CHECKPOINT_NAME "stream-checkpoint"
|
||||||
#define MND_STREAM_PAUSE_NAME "stream-pause"
|
#define MND_STREAM_PAUSE_NAME "stream-pause"
|
||||||
|
@ -97,9 +91,14 @@ int32_t mndAddtoCheckpointWaitingList(SStreamObj *pStream, int64_t checkpointId)
|
||||||
bool mndStreamTransConflictCheck(SMnode *pMnode, int64_t streamUid, const char *pTransName, bool lock);
|
bool mndStreamTransConflictCheck(SMnode *pMnode, int64_t streamUid, const char *pTransName, bool lock);
|
||||||
int32_t mndStreamGetRelTrans(SMnode *pMnode, int64_t streamUid);
|
int32_t mndStreamGetRelTrans(SMnode *pMnode, int64_t streamUid);
|
||||||
|
|
||||||
|
typedef struct SOrphanTask {
|
||||||
|
int64_t streamId;
|
||||||
|
int32_t taskId;
|
||||||
|
int32_t nodeId;
|
||||||
|
} SOrphanTask;
|
||||||
|
|
||||||
// for sma
|
// for sma
|
||||||
// TODO refactor
|
// TODO refactor
|
||||||
int32_t mndDropStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
|
|
||||||
int32_t mndGetNumOfStreams(SMnode *pMnode, char *dbName, int32_t *pNumOfStreams);
|
int32_t mndGetNumOfStreams(SMnode *pMnode, char *dbName, int32_t *pNumOfStreams);
|
||||||
int32_t mndGetNumOfStreamTasks(const SStreamObj *pStream);
|
int32_t mndGetNumOfStreamTasks(const SStreamObj *pStream);
|
||||||
SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady);
|
SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady);
|
||||||
|
@ -119,7 +118,8 @@ void saveStreamTasksInfo(SStreamObj *pStream, SStreamExecInfo *pExecNode)
|
||||||
int32_t initStreamNodeList(SMnode *pMnode);
|
int32_t initStreamNodeList(SMnode *pMnode);
|
||||||
int32_t mndStreamSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamObj* pStream, int8_t igUntreated);
|
int32_t mndStreamSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamObj* pStream, int8_t igUntreated);
|
||||||
int32_t mndStreamSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
|
int32_t mndStreamSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
|
||||||
|
int32_t mndStreamSetDropAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
|
||||||
|
int32_t mndStreamSetDropActionFromList(SMnode *pMnode, STrans *pTrans, SArray *pList);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -865,7 +865,7 @@ static int32_t mndDropSma(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SSmaObj *p
|
||||||
sdbRelease(pMnode->pSdb, pStream);
|
sdbRelease(pMnode->pSdb, pStream);
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
} else {
|
} else {
|
||||||
if (mndDropStreamTasks(pMnode, pTrans, pStream) < 0) {
|
if (mndStreamSetDropAction(pMnode, pTrans, pStream) < 0) {
|
||||||
mError("stream:%s, failed to drop task since %s", pStream->name, terrstr());
|
mError("stream:%s, failed to drop task since %s", pStream->name, terrstr());
|
||||||
sdbRelease(pMnode->pSdb, pStream);
|
sdbRelease(pMnode->pSdb, pStream);
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
|
@ -917,7 +917,7 @@ int32_t mndDropSmasByStb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *p
|
||||||
|
|
||||||
SStreamObj *pStream = mndAcquireStream(pMnode, streamName);
|
SStreamObj *pStream = mndAcquireStream(pMnode, streamName);
|
||||||
if (pStream != NULL && pStream->smaId == pSma->uid) {
|
if (pStream != NULL && pStream->smaId == pSma->uid) {
|
||||||
if (mndDropStreamTasks(pMnode, pTrans, pStream) < 0) {
|
if (mndStreamSetDropAction(pMnode, pTrans, pStream) < 0) {
|
||||||
mError("stream:%s, failed to drop task since %s", pStream->name, terrstr());
|
mError("stream:%s, failed to drop task since %s", pStream->name, terrstr());
|
||||||
mndReleaseStream(pMnode, pStream);
|
mndReleaseStream(pMnode, pStream);
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
|
|
|
@ -608,50 +608,6 @@ _OVER:
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndPersistTaskDropReq(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask) {
|
|
||||||
SVDropStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVDropStreamTaskReq));
|
|
||||||
if (pReq == NULL) {
|
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
pReq->head.vgId = htonl(pTask->info.nodeId);
|
|
||||||
pReq->taskId = pTask->id.taskId;
|
|
||||||
pReq->streamId = pTask->id.streamId;
|
|
||||||
|
|
||||||
SEpSet epset = {0};
|
|
||||||
bool hasEpset = false;
|
|
||||||
int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
|
|
||||||
if (code != TSDB_CODE_SUCCESS || !hasEpset) { // no valid epset, return directly without redoAction
|
|
||||||
terrno = code;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
// The epset of nodeId of this task may have been expired now, let's use the newest epset from mnode.
|
|
||||||
code = setTransAction(pTrans, pReq, sizeof(SVDropStreamTaskReq), TDMT_STREAM_TASK_DROP, &epset, 0);
|
|
||||||
if (code != 0) {
|
|
||||||
taosMemoryFree(pReq);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t mndDropStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) {
|
|
||||||
int32_t lv = taosArrayGetSize(pStream->tasks);
|
|
||||||
for (int32_t i = 0; i < lv; i++) {
|
|
||||||
SArray *pTasks = taosArrayGetP(pStream->tasks, i);
|
|
||||||
int32_t sz = taosArrayGetSize(pTasks);
|
|
||||||
for (int32_t j = 0; j < sz; j++) {
|
|
||||||
SStreamTask *pTask = taosArrayGetP(pTasks, j);
|
|
||||||
if (mndPersistTaskDropReq(pMnode, pTrans, pTask) < 0) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t checkForNumOfStreams(SMnode *pMnode, SStreamObj *pStreamObj) { // check for number of existed tasks
|
static int32_t checkForNumOfStreams(SMnode *pMnode, SStreamObj *pStreamObj) { // check for number of existed tasks
|
||||||
int32_t numOfStream = 0;
|
int32_t numOfStream = 0;
|
||||||
SStreamObj *pStream = NULL;
|
SStreamObj *pStream = NULL;
|
||||||
|
@ -1200,7 +1156,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
|
||||||
int32_t code = mndStreamRegisterTrans(pTrans, MND_STREAM_DROP_NAME, pStream->uid);
|
int32_t code = mndStreamRegisterTrans(pTrans, MND_STREAM_DROP_NAME, pStream->uid);
|
||||||
|
|
||||||
// drop all tasks
|
// drop all tasks
|
||||||
if (mndDropStreamTasks(pMnode, pTrans, pStream) < 0) {
|
if (mndStreamSetDropAction(pMnode, pTrans, pStream) < 0) {
|
||||||
mError("stream:%s, failed to drop task since %s", dropReq.name, terrstr());
|
mError("stream:%s, failed to drop task since %s", dropReq.name, terrstr());
|
||||||
sdbRelease(pMnode->pSdb, pStream);
|
sdbRelease(pMnode->pSdb, pStream);
|
||||||
mndTransDrop(pTrans);
|
mndTransDrop(pTrans);
|
||||||
|
@ -1264,7 +1220,7 @@ int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
|
||||||
return -1;
|
return -1;
|
||||||
} else {
|
} else {
|
||||||
#if 0
|
#if 0
|
||||||
if (mndDropStreamTasks(pMnode, pTrans, pStream) < 0) {
|
if (mndStreamSetDropAction(pMnode, pTrans, pStream) < 0) {
|
||||||
mError("stream:%s, failed to drop task since %s", pStream->name, terrstr());
|
mError("stream:%s, failed to drop task since %s", pStream->name, terrstr());
|
||||||
sdbRelease(pMnode->pSdb, pStream);
|
sdbRelease(pMnode->pSdb, pStream);
|
||||||
sdbCancelFetch(pSdb, pIter);
|
sdbCancelFetch(pSdb, pIter);
|
||||||
|
|
|
@ -16,6 +16,12 @@
|
||||||
#include "mndStream.h"
|
#include "mndStream.h"
|
||||||
#include "mndTrans.h"
|
#include "mndTrans.h"
|
||||||
|
|
||||||
|
typedef struct SFailedCheckpointInfo {
|
||||||
|
int64_t streamUid;
|
||||||
|
int64_t checkpointId;
|
||||||
|
int32_t transId;
|
||||||
|
} SFailedCheckpointInfo;
|
||||||
|
|
||||||
static void doExtractTasksFromStream(SMnode *pMnode) {
|
static void doExtractTasksFromStream(SMnode *pMnode) {
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
SStreamObj *pStream = NULL;
|
SStreamObj *pStream = NULL;
|
||||||
|
@ -177,10 +183,51 @@ static int32_t setNodeEpsetExpiredFlag(const SArray *pNodeList) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t mndDropOrphanTasks(SMnode* pMnode, SArray* pList) {
|
||||||
|
SOrphanTask* pTask = taosArrayGet(pList, 0);
|
||||||
|
|
||||||
|
// check if it is conflict with other trans in both sourceDb and targetDb.
|
||||||
|
bool conflict = mndStreamTransConflictCheck(pMnode, pTask->streamId, MND_STREAM_DROP_NAME, false);
|
||||||
|
if (conflict) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
SStreamObj dummyObj = {.uid = pTask->streamId, .sourceDb = "", .targetSTbName = ""};
|
||||||
|
STrans* pTrans = doCreateTrans(pMnode, &dummyObj, NULL, MND_STREAM_DROP_NAME, "drop stream");
|
||||||
|
if (pTrans == NULL) {
|
||||||
|
mError("failed to create trans to drop orphan tasks since %s", terrstr());
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t code = mndStreamRegisterTrans(pTrans, MND_STREAM_DROP_NAME, pTask->streamId);
|
||||||
|
|
||||||
|
// drop all tasks
|
||||||
|
if (mndStreamSetDropActionFromList(pMnode, pTrans, pList) < 0) {
|
||||||
|
mError("failed to create trans to drop orphan tasks since %s", terrstr());
|
||||||
|
mndTransDrop(pTrans);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
// drop stream
|
||||||
|
if (mndPersistTransLog(&dummyObj, pTrans, SDB_STATUS_DROPPED) < 0) {
|
||||||
|
mndTransDrop(pTrans);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (mndTransPrepare(pMnode, pTrans) != 0) {
|
||||||
|
mError("trans:%d, failed to prepare drop stream trans since %s", pTrans->id, terrstr());
|
||||||
|
mndTransDrop(pTrans);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t mndProcessStreamHb(SRpcMsg *pReq) {
|
int32_t mndProcessStreamHb(SRpcMsg *pReq) {
|
||||||
SMnode *pMnode = pReq->info.node;
|
SMnode *pMnode = pReq->info.node;
|
||||||
SStreamHbMsg req = {0};
|
SStreamHbMsg req = {0};
|
||||||
SArray *pList = taosArrayInit(4, sizeof(SFailedCheckpointInfo));
|
SArray *pFailedTasks = taosArrayInit(4, sizeof(SFailedCheckpointInfo));
|
||||||
|
SArray *pOrphanTasks = taosArrayInit(3, sizeof(SOrphanTask));
|
||||||
|
|
||||||
SDecoder decoder = {0};
|
SDecoder decoder = {0};
|
||||||
tDecoderInit(&decoder, pReq->pCont, pReq->contLen);
|
tDecoderInit(&decoder, pReq->pCont, pReq->contLen);
|
||||||
|
@ -198,8 +245,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
|
||||||
taosThreadMutexLock(&execInfo.lock);
|
taosThreadMutexLock(&execInfo.lock);
|
||||||
|
|
||||||
// extract stream task list
|
// extract stream task list
|
||||||
int32_t numOfExisted = taosHashGetSize(execInfo.pTaskMap);
|
if (taosHashGetSize(execInfo.pTaskMap) == 0) {
|
||||||
if (numOfExisted == 0) {
|
|
||||||
doExtractTasksFromStream(pMnode);
|
doExtractTasksFromStream(pMnode);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -218,6 +264,9 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
|
||||||
STaskStatusEntry *pTaskEntry = taosHashGet(execInfo.pTaskMap, &p->id, sizeof(p->id));
|
STaskStatusEntry *pTaskEntry = taosHashGet(execInfo.pTaskMap, &p->id, sizeof(p->id));
|
||||||
if (pTaskEntry == NULL) {
|
if (pTaskEntry == NULL) {
|
||||||
mError("s-task:0x%" PRIx64 " not found in mnode task list", p->id.taskId);
|
mError("s-task:0x%" PRIx64 " not found in mnode task list", p->id.taskId);
|
||||||
|
|
||||||
|
SOrphanTask oTask = {.streamId = p->id.streamId, .taskId = p->id.taskId, .nodeId = p->nodeId};
|
||||||
|
taosArrayPush(pOrphanTasks, &oTask);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -240,15 +289,13 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
|
||||||
}
|
}
|
||||||
|
|
||||||
streamTaskStatusCopy(pTaskEntry, p);
|
streamTaskStatusCopy(pTaskEntry, p);
|
||||||
if (p->checkpointId != 0) {
|
if ((p->checkpointId != 0) && p->checkpointFailed) {
|
||||||
if (p->checkpointFailed) {
|
|
||||||
mError("stream task:0x%" PRIx64 " checkpointId:%" PRIx64 " transId:%d failed, kill it", p->id.taskId,
|
mError("stream task:0x%" PRIx64 " checkpointId:%" PRIx64 " transId:%d failed, kill it", p->id.taskId,
|
||||||
p->checkpointId, p->chkpointTransId);
|
p->checkpointId, p->chkpointTransId);
|
||||||
|
|
||||||
SFailedCheckpointInfo info = {
|
SFailedCheckpointInfo info = {
|
||||||
.transId = p->chkpointTransId, .checkpointId = p->checkpointId, .streamUid = p->id.streamId};
|
.transId = p->chkpointTransId, .checkpointId = p->checkpointId, .streamUid = p->id.streamId};
|
||||||
addIntoCheckpointList(pList, &info);
|
addIntoCheckpointList(pFailedTasks, &info);
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -266,15 +313,15 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
|
||||||
|
|
||||||
// current checkpoint is failed, rollback from the checkpoint trans
|
// current checkpoint is failed, rollback from the checkpoint trans
|
||||||
// kill the checkpoint trans and then set all tasks status to be normal
|
// kill the checkpoint trans and then set all tasks status to be normal
|
||||||
if (taosArrayGetSize(pList) > 0) {
|
if (taosArrayGetSize(pFailedTasks) > 0) {
|
||||||
bool allReady = true;
|
bool allReady = true;
|
||||||
SArray *p = mndTakeVgroupSnapshot(pMnode, &allReady);
|
SArray *p = mndTakeVgroupSnapshot(pMnode, &allReady);
|
||||||
taosArrayDestroy(p);
|
taosArrayDestroy(p);
|
||||||
|
|
||||||
if (allReady || snodeChanged) {
|
if (allReady || snodeChanged) {
|
||||||
// if the execInfo.activeCheckpoint == 0, the checkpoint is restoring from wal
|
// if the execInfo.activeCheckpoint == 0, the checkpoint is restoring from wal
|
||||||
for(int32_t i = 0; i < taosArrayGetSize(pList); ++i) {
|
for(int32_t i = 0; i < taosArrayGetSize(pFailedTasks); ++i) {
|
||||||
SFailedCheckpointInfo *pInfo = taosArrayGet(pList, i);
|
SFailedCheckpointInfo *pInfo = taosArrayGet(pFailedTasks, i);
|
||||||
mInfo("checkpointId:%" PRId64 " transId:%d failed, issue task-reset trans to reset all tasks status",
|
mInfo("checkpointId:%" PRId64 " transId:%d failed, issue task-reset trans to reset all tasks status",
|
||||||
pInfo->checkpointId, pInfo->transId);
|
pInfo->checkpointId, pInfo->transId);
|
||||||
|
|
||||||
|
@ -285,9 +332,16 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// handle the orphan tasks that are invalid but not removed in some vnodes or snode due to some unknown errors.
|
||||||
|
if (taosArrayGetSize(pOrphanTasks) > 0) {
|
||||||
|
mndDropOrphanTasks(pMnode, pOrphanTasks);
|
||||||
|
}
|
||||||
|
|
||||||
taosThreadMutexUnlock(&execInfo.lock);
|
taosThreadMutexUnlock(&execInfo.lock);
|
||||||
streamMetaClearHbMsg(&req);
|
streamMetaClearHbMsg(&req);
|
||||||
|
|
||||||
taosArrayDestroy(pList);
|
taosArrayDestroy(pFailedTasks);
|
||||||
|
taosArrayDestroy(pOrphanTasks);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,6 +18,66 @@
|
||||||
#include "tmisce.h"
|
#include "tmisce.h"
|
||||||
#include "mndVgroup.h"
|
#include "mndVgroup.h"
|
||||||
|
|
||||||
|
typedef struct SStreamTaskIter {
|
||||||
|
SStreamObj *pStream;
|
||||||
|
int32_t level;
|
||||||
|
int32_t ordinalIndex;
|
||||||
|
int32_t totalLevel;
|
||||||
|
SStreamTask *pTask;
|
||||||
|
} SStreamTaskIter;
|
||||||
|
|
||||||
|
SStreamTaskIter* createTaskIter(SStreamObj* pStream) {
|
||||||
|
SStreamTaskIter* pIter = taosMemoryCalloc(1, sizeof(SStreamTaskIter));
|
||||||
|
if (pIter == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
pIter->level = -1;
|
||||||
|
pIter->ordinalIndex = 0;
|
||||||
|
pIter->pStream = pStream;
|
||||||
|
pIter->totalLevel = taosArrayGetSize(pStream->tasks);
|
||||||
|
pIter->pTask = NULL;
|
||||||
|
|
||||||
|
return pIter;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool taskIterNextTask(SStreamTaskIter* pIter) {
|
||||||
|
if (pIter->level >= pIter->totalLevel) {
|
||||||
|
pIter->pTask = NULL;
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pIter->level == -1) {
|
||||||
|
pIter->level += 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
while(pIter->level < pIter->totalLevel) {
|
||||||
|
SArray *pList = taosArrayGetP(pIter->pStream->tasks, pIter->level);
|
||||||
|
if (pIter->ordinalIndex >= taosArrayGetSize(pList)) {
|
||||||
|
pIter->level += 1;
|
||||||
|
pIter->ordinalIndex = 0;
|
||||||
|
pIter->pTask = NULL;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
pIter->pTask = taosArrayGetP(pList, pIter->ordinalIndex);
|
||||||
|
pIter->ordinalIndex += 1;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
pIter->pTask = NULL;
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
SStreamTask* taskIterGetCurrent(SStreamTaskIter* pIter) {
|
||||||
|
return pIter->pTask;
|
||||||
|
}
|
||||||
|
|
||||||
|
void destroyTaskIter(SStreamTaskIter* pIter) {
|
||||||
|
taosMemoryFree(pIter);
|
||||||
|
}
|
||||||
|
|
||||||
SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady) {
|
SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady) {
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
void *pIter = NULL;
|
void *pIter = NULL;
|
||||||
|
@ -251,16 +311,12 @@ static int32_t doSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTa
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mndStreamSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) {
|
int32_t mndStreamSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) {
|
||||||
SArray *tasks = pStream->tasks;
|
SStreamTaskIter *pIter = createTaskIter(pStream);
|
||||||
|
|
||||||
int32_t size = taosArrayGetSize(tasks);
|
|
||||||
for (int32_t i = 0; i < size; i++) {
|
|
||||||
SArray *pTasks = taosArrayGetP(tasks, i);
|
|
||||||
int32_t sz = taosArrayGetSize(pTasks);
|
|
||||||
for (int32_t j = 0; j < sz; j++) {
|
|
||||||
SStreamTask *pTask = taosArrayGetP(pTasks, j);
|
|
||||||
|
|
||||||
|
while (taskIterNextTask(pIter)) {
|
||||||
|
SStreamTask *pTask = taskIterGetCurrent(pIter);
|
||||||
if (doSetPauseAction(pMnode, pTrans, pTask) < 0) {
|
if (doSetPauseAction(pMnode, pTrans, pTask) < 0) {
|
||||||
|
destroyTaskIter(pIter);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -269,6 +325,89 @@ int32_t mndStreamSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStr
|
||||||
atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__PAUSE);
|
atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__PAUSE);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
destroyTaskIter(pIter);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t doSetDropAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask) {
|
||||||
|
SVDropStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVDropStreamTaskReq));
|
||||||
|
if (pReq == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
pReq->head.vgId = htonl(pTask->info.nodeId);
|
||||||
|
pReq->taskId = pTask->id.taskId;
|
||||||
|
pReq->streamId = pTask->id.streamId;
|
||||||
|
|
||||||
|
SEpSet epset = {0};
|
||||||
|
bool hasEpset = false;
|
||||||
|
int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
|
||||||
|
if (code != TSDB_CODE_SUCCESS || !hasEpset) { // no valid epset, return directly without redoAction
|
||||||
|
terrno = code;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
// The epset of nodeId of this task may have been expired now, let's use the newest epset from mnode.
|
||||||
|
code = setTransAction(pTrans, pReq, sizeof(SVDropStreamTaskReq), TDMT_STREAM_TASK_DROP, &epset, 0);
|
||||||
|
if (code != 0) {
|
||||||
|
taosMemoryFree(pReq);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t mndStreamSetDropAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) {
|
||||||
|
SStreamTaskIter *pIter = createTaskIter(pStream);
|
||||||
|
|
||||||
|
while(taskIterNextTask(pIter)) {
|
||||||
|
SStreamTask *pTask = taskIterGetCurrent(pIter);
|
||||||
|
if (doSetDropAction(pMnode, pTrans, pTask) < 0) {
|
||||||
|
destroyTaskIter(pIter);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
destroyTaskIter(pIter);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t doSetDropActionFromId(SMnode *pMnode, STrans *pTrans, SOrphanTask* pTask) {
|
||||||
|
SVDropStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVDropStreamTaskReq));
|
||||||
|
if (pReq == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
pReq->head.vgId = htonl(pTask->nodeId);
|
||||||
|
pReq->taskId = pTask->taskId;
|
||||||
|
pReq->streamId = pTask->streamId;
|
||||||
|
|
||||||
|
SEpSet epset = {0};
|
||||||
|
bool hasEpset = false;
|
||||||
|
int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->taskId, pTask->nodeId);
|
||||||
|
if (code != TSDB_CODE_SUCCESS || (!hasEpset)) { // no valid epset, return directly without redoAction
|
||||||
|
terrno = code;
|
||||||
|
taosMemoryFree(pReq);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
// The epset of nodeId of this task may have been expired now, let's use the newest epset from mnode.
|
||||||
|
code = setTransAction(pTrans, pReq, sizeof(SVDropStreamTaskReq), TDMT_STREAM_TASK_DROP, &epset, 0);
|
||||||
|
if (code != 0) {
|
||||||
|
taosMemoryFree(pReq);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t mndStreamSetDropActionFromList(SMnode *pMnode, STrans *pTrans, SArray* pList) {
|
||||||
|
for(int32_t i = 0; i < taosArrayGetSize(pList); ++i) {
|
||||||
|
SOrphanTask* pTask = taosArrayGet(pList, i);
|
||||||
|
mDebug("add drop task:0x%x action to drop orphan task", pTask->taskId);
|
||||||
|
doSetDropActionFromId(pMnode, pTrans, pTask);
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
|
@ -207,6 +207,10 @@ void tsortAppendTupleToBlock(SSortHandle* pHandle, SSDataBlock* pBlock, STupleHa
|
||||||
*/
|
*/
|
||||||
int32_t tsortCompAndBuildKeys(const SArray* pSortCols, char* keyBuf, int32_t* keyLen, const STupleHandle* pTuple);
|
int32_t tsortCompAndBuildKeys(const SArray* pSortCols, char* keyBuf, int32_t* keyLen, const STupleHandle* pTuple);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @brief set the merge limit reached callback. it calls mergeLimitReached param with tableUid and param
|
||||||
|
*/
|
||||||
|
void tsortSetMergeLimitReachedFp(SSortHandle* pHandle, void (*mergeLimitReached)(uint64_t tableUid, void* param), void* param);
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -3311,28 +3311,16 @@ _error:
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
// ========================= table merge scan
|
static void tableMergeScanDoSkipTable(uint64_t uid, void* pTableMergeScanInfo) {
|
||||||
|
STableMergeScanInfo* pInfo = pTableMergeScanInfo;
|
||||||
static int32_t tableMergeScanDoSkipTable(STableMergeScanInfo* pInfo, SSDataBlock* pBlock) {
|
|
||||||
int64_t nRows = 0;
|
|
||||||
void* pNum = tSimpleHashGet(pInfo->mTableNumRows, &pBlock->info.id.uid, sizeof(pBlock->info.id.uid));
|
|
||||||
if (pNum == NULL) {
|
|
||||||
nRows = pBlock->info.rows;
|
|
||||||
tSimpleHashPut(pInfo->mTableNumRows, &pBlock->info.id.uid, sizeof(pBlock->info.id.uid), &nRows, sizeof(nRows));
|
|
||||||
} else {
|
|
||||||
*(int64_t*)pNum = *(int64_t*)pNum + pBlock->info.rows;
|
|
||||||
nRows = *(int64_t*)pNum;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (nRows >= pInfo->mergeLimit) {
|
|
||||||
if (pInfo->mSkipTables == NULL) {
|
if (pInfo->mSkipTables == NULL) {
|
||||||
pInfo->mSkipTables = taosHashInit(pInfo->tableEndIndex - pInfo->tableStartIndex + 1,
|
pInfo->mSkipTables = taosHashInit(pInfo->tableEndIndex - pInfo->tableStartIndex + 1,
|
||||||
taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_NO_LOCK);
|
taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_NO_LOCK);
|
||||||
}
|
}
|
||||||
int bSkip = 1;
|
int bSkip = 1;
|
||||||
taosHashPut(pInfo->mSkipTables, &pBlock->info.id.uid, sizeof(pBlock->info.id.uid), &bSkip, sizeof(bSkip));
|
if (pInfo->mSkipTables != NULL) {
|
||||||
|
taosHashPut(pInfo->mSkipTables, &uid, sizeof(uid), &bSkip, sizeof(bSkip));
|
||||||
}
|
}
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static void doGetBlockForTableMergeScan(SOperatorInfo* pOperator, bool* pFinished, bool* pSkipped) {
|
static void doGetBlockForTableMergeScan(SOperatorInfo* pOperator, bool* pFinished, bool* pSkipped) {
|
||||||
|
@ -3449,10 +3437,6 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) {
|
||||||
}
|
}
|
||||||
pBlock->info.id.groupId = tableListGetTableGroupId(pInfo->base.pTableListInfo, pBlock->info.id.uid);
|
pBlock->info.id.groupId = tableListGetTableGroupId(pInfo->base.pTableListInfo, pBlock->info.id.uid);
|
||||||
|
|
||||||
if (pInfo->mergeLimit != -1) {
|
|
||||||
tableMergeScanDoSkipTable(pInfo, pBlock);
|
|
||||||
}
|
|
||||||
|
|
||||||
pOperator->resultInfo.totalRows += pBlock->info.rows;
|
pOperator->resultInfo.totalRows += pBlock->info.rows;
|
||||||
|
|
||||||
pInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
|
pInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
|
||||||
|
@ -3530,6 +3514,7 @@ int32_t startDurationForGroupTableMergeScan(SOperatorInfo* pOperator) {
|
||||||
pInfo->pSortInputBlock, pTaskInfo->id.str, 0, 0, 0);
|
pInfo->pSortInputBlock, pTaskInfo->id.str, 0, 0, 0);
|
||||||
}
|
}
|
||||||
tsortSetMergeLimit(pInfo->pSortHandle, pInfo->mergeLimit);
|
tsortSetMergeLimit(pInfo->pSortHandle, pInfo->mergeLimit);
|
||||||
|
tsortSetMergeLimitReachedFp(pInfo->pSortHandle, tableMergeScanDoSkipTable, pInfo);
|
||||||
tsortSetAbortCheckFn(pInfo->pSortHandle, isTaskKilled, pOperator->pTaskInfo);
|
tsortSetAbortCheckFn(pInfo->pSortHandle, isTaskKilled, pOperator->pTaskInfo);
|
||||||
|
|
||||||
tsortSetFetchRawDataFp(pInfo->pSortHandle, getBlockForTableMergeScan, NULL, NULL);
|
tsortSetFetchRawDataFp(pInfo->pSortHandle, getBlockForTableMergeScan, NULL, NULL);
|
||||||
|
@ -3589,8 +3574,6 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
|
||||||
int32_t tableStartIdx = pInfo->tableStartIndex;
|
int32_t tableStartIdx = pInfo->tableStartIndex;
|
||||||
int32_t tableEndIdx = pInfo->tableEndIndex;
|
int32_t tableEndIdx = pInfo->tableEndIndex;
|
||||||
|
|
||||||
tSimpleHashClear(pInfo->mTableNumRows);
|
|
||||||
|
|
||||||
int32_t numOfTable = tableEndIdx - tableStartIdx + 1;
|
int32_t numOfTable = tableEndIdx - tableStartIdx + 1;
|
||||||
STableKeyInfo* startKeyInfo = tableListGetInfo(pInfo->base.pTableListInfo, tableStartIdx);
|
STableKeyInfo* startKeyInfo = tableListGetInfo(pInfo->base.pTableListInfo, tableStartIdx);
|
||||||
pAPI->tsdReader.tsdReaderOpen(pHandle->vnode, &pInfo->base.cond, startKeyInfo, numOfTable, pInfo->pReaderBlock,
|
pAPI->tsdReader.tsdReaderOpen(pHandle->vnode, &pInfo->base.cond, startKeyInfo, numOfTable, pInfo->pReaderBlock,
|
||||||
|
@ -3757,8 +3740,6 @@ void destroyTableMergeScanOperatorInfo(void* param) {
|
||||||
taosArrayDestroy(pTableScanInfo->sortSourceParams);
|
taosArrayDestroy(pTableScanInfo->sortSourceParams);
|
||||||
tsortDestroySortHandle(pTableScanInfo->pSortHandle);
|
tsortDestroySortHandle(pTableScanInfo->pSortHandle);
|
||||||
pTableScanInfo->pSortHandle = NULL;
|
pTableScanInfo->pSortHandle = NULL;
|
||||||
tSimpleHashCleanup(pTableScanInfo->mTableNumRows);
|
|
||||||
pTableScanInfo->mTableNumRows = NULL;
|
|
||||||
taosHashCleanup(pTableScanInfo->mSkipTables);
|
taosHashCleanup(pTableScanInfo->mSkipTables);
|
||||||
pTableScanInfo->mSkipTables = NULL;
|
pTableScanInfo->mSkipTables = NULL;
|
||||||
destroyTableScanBase(&pTableScanInfo->base, &pTableScanInfo->base.readerAPI);
|
destroyTableScanBase(&pTableScanInfo->base, &pTableScanInfo->base.readerAPI);
|
||||||
|
@ -3847,8 +3828,7 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
|
||||||
pInfo->pReaderBlock = createOneDataBlock(pInfo->pResBlock, false);
|
pInfo->pReaderBlock = createOneDataBlock(pInfo->pResBlock, false);
|
||||||
|
|
||||||
initLimitInfo(pTableScanNode->scan.node.pLimit, pTableScanNode->scan.node.pSlimit, &pInfo->limitInfo);
|
initLimitInfo(pTableScanNode->scan.node.pLimit, pTableScanNode->scan.node.pSlimit, &pInfo->limitInfo);
|
||||||
pInfo->mTableNumRows = tSimpleHashInit(1024,
|
|
||||||
taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT));
|
|
||||||
pInfo->mergeLimit = -1;
|
pInfo->mergeLimit = -1;
|
||||||
bool hasLimit = pInfo->limitInfo.limit.limit != -1 || pInfo->limitInfo.limit.offset != -1;
|
bool hasLimit = pInfo->limitInfo.limit.limit != -1 || pInfo->limitInfo.limit.offset != -1;
|
||||||
if (hasLimit) {
|
if (hasLimit) {
|
||||||
|
|
|
@ -82,6 +82,9 @@ struct SSortHandle {
|
||||||
int32_t extRowsMemSize;
|
int32_t extRowsMemSize;
|
||||||
int32_t srcTsSlotId;
|
int32_t srcTsSlotId;
|
||||||
SBlockOrderInfo extRowsOrderInfo;
|
SBlockOrderInfo extRowsOrderInfo;
|
||||||
|
|
||||||
|
void (*mergeLimitReachedFn)(uint64_t tableUid, void* param);
|
||||||
|
void* mergeLimitReachedParam;
|
||||||
};
|
};
|
||||||
|
|
||||||
void tsortSetSingleTableMerge(SSortHandle* pHandle) {
|
void tsortSetSingleTableMerge(SSortHandle* pHandle) {
|
||||||
|
@ -1288,6 +1291,39 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray*
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static SSDataBlock* getRowsBlockWithinMergeLimit(const SSortHandle* pHandle, SSHashObj* mTableNumRows, SSDataBlock* pOrigBlk, bool* pExtractedBlock) {
|
||||||
|
int64_t nRows = 0;
|
||||||
|
int64_t prevRows = 0;
|
||||||
|
void* pNum = tSimpleHashGet(mTableNumRows, &pOrigBlk->info.id.uid, sizeof(pOrigBlk->info.id.uid));
|
||||||
|
if (pNum == NULL) {
|
||||||
|
prevRows = 0;
|
||||||
|
nRows = pOrigBlk->info.rows;
|
||||||
|
tSimpleHashPut(mTableNumRows, &pOrigBlk->info.id.uid, sizeof(pOrigBlk->info.id.uid), &nRows, sizeof(nRows));
|
||||||
|
} else {
|
||||||
|
prevRows = *(int64_t*)pNum;
|
||||||
|
*(int64_t*)pNum = *(int64_t*)pNum + pOrigBlk->info.rows;
|
||||||
|
nRows = *(int64_t*)pNum;
|
||||||
|
}
|
||||||
|
|
||||||
|
int64_t keepRows = pOrigBlk->info.rows;
|
||||||
|
if (nRows >= pHandle->mergeLimit) {
|
||||||
|
if (pHandle->mergeLimitReachedFn) {
|
||||||
|
pHandle->mergeLimitReachedFn(pOrigBlk->info.id.uid, pHandle->mergeLimitReachedParam);
|
||||||
|
}
|
||||||
|
keepRows = pHandle->mergeLimit - prevRows;
|
||||||
|
}
|
||||||
|
|
||||||
|
SSDataBlock* pBlock = NULL;
|
||||||
|
if (keepRows != pOrigBlk->info.rows) {
|
||||||
|
pBlock = blockDataExtractBlock(pOrigBlk, 0, keepRows);
|
||||||
|
*pExtractedBlock = true;
|
||||||
|
} else {
|
||||||
|
*pExtractedBlock = false;
|
||||||
|
pBlock = pOrigBlk;
|
||||||
|
}
|
||||||
|
return pBlock;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
|
static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
|
||||||
size_t nSrc = taosArrayGetSize(pHandle->pOrderedSource);
|
size_t nSrc = taosArrayGetSize(pHandle->pOrderedSource);
|
||||||
SArray* aExtSrc = taosArrayInit(nSrc, POINTER_BYTES);
|
SArray* aExtSrc = taosArrayInit(nSrc, POINTER_BYTES);
|
||||||
|
@ -1310,10 +1346,18 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
|
||||||
pHandle->currMergeLimitTs = INT64_MIN;
|
pHandle->currMergeLimitTs = INT64_MIN;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SSHashObj* mTableNumRows = tSimpleHashInit(8192, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT));
|
||||||
SArray* aBlkSort = taosArrayInit(8, POINTER_BYTES);
|
SArray* aBlkSort = taosArrayInit(8, POINTER_BYTES);
|
||||||
SSHashObj* mUidBlk = tSimpleHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT));
|
SSHashObj* mUidBlk = tSimpleHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT));
|
||||||
while (1) {
|
while (1) {
|
||||||
SSDataBlock* pBlk = pHandle->fetchfp(pSrc->param);
|
SSDataBlock* pBlk = pHandle->fetchfp(pSrc->param);
|
||||||
|
|
||||||
|
int64_t p = taosGetTimestampUs();
|
||||||
|
bool bExtractedBlock = false;
|
||||||
|
if (pBlk != NULL && pHandle->mergeLimit > 0) {
|
||||||
|
pBlk = getRowsBlockWithinMergeLimit(pHandle, mTableNumRows, pBlk, &bExtractedBlock);
|
||||||
|
}
|
||||||
|
|
||||||
if (pBlk != NULL) {
|
if (pBlk != NULL) {
|
||||||
SColumnInfoData* tsCol = taosArrayGet(pBlk->pDataBlock, pOrigOrder->slotId);
|
SColumnInfoData* tsCol = taosArrayGet(pBlk->pDataBlock, pOrigOrder->slotId);
|
||||||
int64_t firstRowTs = *(int64_t*)tsCol->pData;
|
int64_t firstRowTs = *(int64_t*)tsCol->pData;
|
||||||
|
@ -1322,6 +1366,7 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pBlk != NULL) {
|
if (pBlk != NULL) {
|
||||||
szSort += blockDataGetSize(pBlk);
|
szSort += blockDataGetSize(pBlk);
|
||||||
|
|
||||||
|
@ -1329,8 +1374,11 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
|
||||||
if (ppBlk != NULL) {
|
if (ppBlk != NULL) {
|
||||||
SSDataBlock* tBlk = *(SSDataBlock**)(ppBlk);
|
SSDataBlock* tBlk = *(SSDataBlock**)(ppBlk);
|
||||||
blockDataMerge(tBlk, pBlk);
|
blockDataMerge(tBlk, pBlk);
|
||||||
|
if (bExtractedBlock) {
|
||||||
|
blockDataDestroy(pBlk);
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
SSDataBlock* tBlk = createOneDataBlock(pBlk, true);
|
SSDataBlock* tBlk = (bExtractedBlock) ? pBlk : createOneDataBlock(pBlk, true);
|
||||||
tSimpleHashPut(mUidBlk, &pBlk->info.id.uid, sizeof(pBlk->info.id.uid), &tBlk, POINTER_BYTES);
|
tSimpleHashPut(mUidBlk, &pBlk->info.id.uid, sizeof(pBlk->info.id.uid), &tBlk, POINTER_BYTES);
|
||||||
taosArrayPush(aBlkSort, &tBlk);
|
taosArrayPush(aBlkSort, &tBlk);
|
||||||
}
|
}
|
||||||
|
@ -1341,6 +1389,7 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
|
||||||
|
|
||||||
int64_t p = taosGetTimestampUs();
|
int64_t p = taosGetTimestampUs();
|
||||||
code = sortBlocksToExtSource(pHandle, aBlkSort, aExtSrc);
|
code = sortBlocksToExtSource(pHandle, aBlkSort, aExtSrc);
|
||||||
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
tSimpleHashCleanup(mUidBlk);
|
tSimpleHashCleanup(mUidBlk);
|
||||||
taosArrayDestroy(aBlkSort);
|
taosArrayDestroy(aBlkSort);
|
||||||
|
@ -1379,7 +1428,7 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
|
||||||
taosArrayAddAll(pHandle->pOrderedSource, aExtSrc);
|
taosArrayAddAll(pHandle->pOrderedSource, aExtSrc);
|
||||||
}
|
}
|
||||||
taosArrayDestroy(aExtSrc);
|
taosArrayDestroy(aExtSrc);
|
||||||
|
tSimpleHashCleanup(mTableNumRows);
|
||||||
pHandle->type = SORT_SINGLESOURCE_SORT;
|
pHandle->type = SORT_SINGLESOURCE_SORT;
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -1858,3 +1907,8 @@ int32_t tsortCompAndBuildKeys(const SArray* pSortCols, char* keyBuf, int32_t* ke
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void tsortSetMergeLimitReachedFp(SSortHandle* pHandle, void (*mergeLimitReachedCb)(uint64_t tableUid, void* param), void* param) {
|
||||||
|
pHandle->mergeLimitReachedFn = mergeLimitReachedCb;
|
||||||
|
pHandle->mergeLimitReachedParam = param;
|
||||||
|
}
|
||||||
|
|
|
@ -994,6 +994,12 @@ void *taosCacheIterGetKey(const SCacheIter *pIter, size_t *len) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void taosCacheDestroyIter(SCacheIter *pIter) {
|
void taosCacheDestroyIter(SCacheIter *pIter) {
|
||||||
|
for (int32_t i = 0; i < pIter->numOfObj; ++i) {
|
||||||
|
if (!pIter->pCurrent[i]) continue;
|
||||||
|
char *p = pIter->pCurrent[i]->data;
|
||||||
|
taosCacheRelease(pIter->pCacheObj, (void **)&p, false);
|
||||||
|
pIter->pCurrent[i] = NULL;
|
||||||
|
}
|
||||||
taosMemoryFreeClear(pIter->pCurrent);
|
taosMemoryFreeClear(pIter->pCurrent);
|
||||||
taosMemoryFreeClear(pIter);
|
taosMemoryFreeClear(pIter);
|
||||||
}
|
}
|
||||||
|
|
|
@ -573,6 +573,9 @@ void taosPrintSlowLog(const char *format, ...) {
|
||||||
len += vsnprintf(buffer + len, LOG_MAX_LINE_DUMP_BUFFER_SIZE - 2 - len, format, argpointer);
|
len += vsnprintf(buffer + len, LOG_MAX_LINE_DUMP_BUFFER_SIZE - 2 - len, format, argpointer);
|
||||||
va_end(argpointer);
|
va_end(argpointer);
|
||||||
|
|
||||||
|
if (len < 0 || len > LOG_MAX_LINE_DUMP_BUFFER_SIZE - 2) {
|
||||||
|
len = LOG_MAX_LINE_DUMP_BUFFER_SIZE - 2;
|
||||||
|
}
|
||||||
buffer[len++] = '\n';
|
buffer[len++] = '\n';
|
||||||
buffer[len] = 0;
|
buffer[len] = 0;
|
||||||
|
|
||||||
|
|
|
@ -79,7 +79,7 @@ md5sum /home/TDinternal/debug/build/lib/libtaos.so
|
||||||
#define taospy 2.7.10
|
#define taospy 2.7.10
|
||||||
pip3 list|grep taospy
|
pip3 list|grep taospy
|
||||||
pip3 uninstall taospy -y
|
pip3 uninstall taospy -y
|
||||||
pip3 install --default-timeout=120 taospy==2.7.12
|
pip3 install --default-timeout=120 taospy==2.7.13
|
||||||
|
|
||||||
#define taos-ws-py 0.3.1
|
#define taos-ws-py 0.3.1
|
||||||
pip3 list|grep taos-ws-py
|
pip3 list|grep taos-ws-py
|
||||||
|
|
|
@ -129,6 +129,7 @@ endi
|
||||||
|
|
||||||
$offset = $tbNum * $rowNum
|
$offset = $tbNum * $rowNum
|
||||||
$offset = $offset - 1
|
$offset = $offset - 1
|
||||||
|
print select * from $stb order by ts limit 2 offset $offset
|
||||||
sql select * from $stb order by ts limit 2 offset $offset
|
sql select * from $stb order by ts limit 2 offset $offset
|
||||||
if $rows != 1 then
|
if $rows != 1 then
|
||||||
return -1
|
return -1
|
||||||
|
|
Loading…
Reference in New Issue