Merge branch '3.0' into feat/TD-27337
This commit is contained in:
commit
7621d72e04
|
@ -40,4 +40,6 @@ int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, SRpcMsg* pMsg);
|
||||||
int32_t tqStreamTaskProcessTaskPauseReq(SStreamMeta* pMeta, char* pMsg);
|
int32_t tqStreamTaskProcessTaskPauseReq(SStreamMeta* pMeta, char* pMsg);
|
||||||
int32_t tqStreamTaskProcessTaskResumeReq(void* handle, int64_t sversion, char* pMsg, bool fromVnode);
|
int32_t tqStreamTaskProcessTaskResumeReq(void* handle, int64_t sversion, char* pMsg, bool fromVnode);
|
||||||
|
|
||||||
|
int32_t tqExpandStreamTask(SStreamTask* pTask, SStreamMeta* pMeta, void* pVnode);
|
||||||
|
|
||||||
#endif // TDENGINE_TQ_COMMON_H
|
#endif // TDENGINE_TQ_COMMON_H
|
||||||
|
|
|
@ -304,9 +304,9 @@ typedef struct SStreamTaskId {
|
||||||
|
|
||||||
typedef struct SCheckpointInfo {
|
typedef struct SCheckpointInfo {
|
||||||
int64_t startTs;
|
int64_t startTs;
|
||||||
int64_t checkpointId;
|
int64_t checkpointId; // latest checkpoint id
|
||||||
|
int64_t checkpointVer; // latest checkpoint offset in wal
|
||||||
int64_t checkpointVer; // latest checkpointId version
|
int64_t checkpointTime; // latest checkpoint time
|
||||||
int64_t processedVer;
|
int64_t processedVer;
|
||||||
int64_t nextProcessVer; // current offset in WAL, not serialize it
|
int64_t nextProcessVer; // current offset in WAL, not serialize it
|
||||||
int64_t failedId; // record the latest failed checkpoint id
|
int64_t failedId; // record the latest failed checkpoint id
|
||||||
|
@ -386,6 +386,9 @@ typedef struct STaskExecStatisInfo {
|
||||||
int64_t created;
|
int64_t created;
|
||||||
int64_t init;
|
int64_t init;
|
||||||
int64_t start;
|
int64_t start;
|
||||||
|
int64_t startCheckpointId;
|
||||||
|
int64_t startCheckpointVer;
|
||||||
|
|
||||||
int64_t step1Start;
|
int64_t step1Start;
|
||||||
double step1El;
|
double step1El;
|
||||||
int64_t step2Start;
|
int64_t step2Start;
|
||||||
|
@ -442,6 +445,7 @@ struct SStreamTask {
|
||||||
SCheckpointInfo chkInfo;
|
SCheckpointInfo chkInfo;
|
||||||
STaskExec exec;
|
STaskExec exec;
|
||||||
SDataRange dataRange;
|
SDataRange dataRange;
|
||||||
|
SVersionRange step2Range;
|
||||||
SHistoryTaskInfo hTaskInfo;
|
SHistoryTaskInfo hTaskInfo;
|
||||||
STaskId streamTaskId;
|
STaskId streamTaskId;
|
||||||
STaskExecStatisInfo execInfo;
|
STaskExecStatisInfo execInfo;
|
||||||
|
@ -672,24 +676,34 @@ typedef struct {
|
||||||
int32_t tEncodeStreamCheckpointReadyMsg(SEncoder* pEncoder, const SStreamCheckpointReadyMsg* pRsp);
|
int32_t tEncodeStreamCheckpointReadyMsg(SEncoder* pEncoder, const SStreamCheckpointReadyMsg* pRsp);
|
||||||
int32_t tDecodeStreamCheckpointReadyMsg(SDecoder* pDecoder, SStreamCheckpointReadyMsg* pRsp);
|
int32_t tDecodeStreamCheckpointReadyMsg(SDecoder* pDecoder, SStreamCheckpointReadyMsg* pRsp);
|
||||||
|
|
||||||
|
typedef struct STaskCkptInfo {
|
||||||
|
int64_t latestId; // saved checkpoint id
|
||||||
|
int64_t latestVer; // saved checkpoint ver
|
||||||
|
int64_t latestTime; // latest checkpoint time
|
||||||
|
int64_t activeId; // current active checkpoint id
|
||||||
|
int32_t activeTransId; // checkpoint trans id
|
||||||
|
int8_t failed; // denote if the checkpoint is failed or not
|
||||||
|
} STaskCkptInfo;
|
||||||
|
|
||||||
typedef struct STaskStatusEntry {
|
typedef struct STaskStatusEntry {
|
||||||
STaskId id;
|
STaskId id;
|
||||||
int32_t status;
|
int32_t status;
|
||||||
int32_t statusLastDuration; // to record the last duration of current status
|
int32_t statusLastDuration; // to record the last duration of current status
|
||||||
int64_t stage;
|
int64_t stage;
|
||||||
int32_t nodeId;
|
int32_t nodeId;
|
||||||
int64_t verStart; // start version in WAL, only valid for source task
|
SVersionRange verRange; // start/end version in WAL, only valid for source task
|
||||||
int64_t verEnd; // end version in WAL, only valid for source task
|
int64_t processedVer; // only valid for source task
|
||||||
int64_t processedVer; // only valid for source task
|
bool inputQChanging; // inputQ is changing or not
|
||||||
int64_t checkpointId; // current active checkpoint id
|
int64_t inputQUnchangeCounter;
|
||||||
int32_t chkpointTransId; // checkpoint trans id
|
double inputQUsed; // in MiB
|
||||||
int8_t checkpointFailed; // denote if the checkpoint is failed or not
|
double inputRate;
|
||||||
bool inputQChanging; // inputQ is changing or not
|
double sinkQuota; // existed quota size for sink task
|
||||||
int64_t inputQUnchangeCounter;
|
double sinkDataSize; // sink to dst data size
|
||||||
double inputQUsed; // in MiB
|
int64_t startTime;
|
||||||
double inputRate;
|
int64_t startCheckpointId;
|
||||||
double sinkQuota; // existed quota size for sink task
|
int64_t startCheckpointVer;
|
||||||
double sinkDataSize; // sink to dst data size
|
int64_t hTaskId;
|
||||||
|
STaskCkptInfo checkpointInfo;
|
||||||
} STaskStatusEntry;
|
} STaskStatusEntry;
|
||||||
|
|
||||||
typedef struct SStreamHbMsg {
|
typedef struct SStreamHbMsg {
|
||||||
|
|
|
@ -162,6 +162,8 @@ static const SSysDbTableSchema userStbsSchema[] = {
|
||||||
static const SSysDbTableSchema streamSchema[] = {
|
static const SSysDbTableSchema streamSchema[] = {
|
||||||
{.name = "stream_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
{.name = "stream_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
||||||
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false},
|
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false},
|
||||||
|
{.name = "stream_id", .bytes = 16 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
||||||
|
{.name = "history_id", .bytes = 16 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
||||||
{.name = "sql", .bytes = TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
{.name = "sql", .bytes = TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
||||||
{.name = "status", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
{.name = "status", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
||||||
{.name = "source_db", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
{.name = "source_db", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
||||||
|
@ -183,7 +185,16 @@ static const SSysDbTableSchema streamTaskSchema[] = {
|
||||||
{.name = "stage", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false},
|
{.name = "stage", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false},
|
||||||
{.name = "in_queue", .bytes = 20, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
{.name = "in_queue", .bytes = 20, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
||||||
// {.name = "out_queue", .bytes = 20, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
// {.name = "out_queue", .bytes = 20, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
||||||
{.name = "info", .bytes = 25, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
{.name = "info", .bytes = 35, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
||||||
|
{.name = "start_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false},
|
||||||
|
{.name = "start_id", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false},
|
||||||
|
{.name = "start_ver", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false},
|
||||||
|
{.name = "checkpoint_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false},
|
||||||
|
{.name = "checkpoint_id", .bytes = 25, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false},
|
||||||
|
{.name = "checkpoint_version", .bytes = 25, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false},
|
||||||
|
{.name = "ds_err_info", .bytes = 25, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
||||||
|
{.name = "history_task_id", .bytes = 16 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
||||||
|
{.name = "history_task_status", .bytes = 12 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
||||||
};
|
};
|
||||||
|
|
||||||
static const SSysDbTableSchema userTblsSchema[] = {
|
static const SSysDbTableSchema userTblsSchema[] = {
|
||||||
|
|
|
@ -822,25 +822,15 @@ SSDataBlock* blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int3
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
SSDataBlock* pDst = createDataBlock();
|
SSDataBlock* pDst = createOneDataBlock(pBlock, false);
|
||||||
if (pDst == NULL) {
|
if (pDst == NULL) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
pDst->info = pBlock->info;
|
|
||||||
pDst->info.rows = 0;
|
|
||||||
pDst->info.capacity = 0;
|
|
||||||
pDst->info.rowSize = 0;
|
|
||||||
size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
|
|
||||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
|
||||||
SColumnInfoData colInfo = {0};
|
|
||||||
SColumnInfoData* pSrcCol = taosArrayGet(pBlock->pDataBlock, i);
|
|
||||||
colInfo.info = pSrcCol->info;
|
|
||||||
blockDataAppendColInfo(pDst, &colInfo);
|
|
||||||
}
|
|
||||||
|
|
||||||
blockDataEnsureCapacity(pDst, rowCount);
|
blockDataEnsureCapacity(pDst, rowCount);
|
||||||
|
|
||||||
|
|
||||||
/* may have disorder varchar data, TODO
|
/* may have disorder varchar data, TODO
|
||||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||||
SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i);
|
SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i);
|
||||||
|
@ -850,6 +840,8 @@ SSDataBlock* blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int3
|
||||||
}
|
}
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
|
||||||
|
size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
|
||||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||||
SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i);
|
SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i);
|
||||||
SColumnInfoData* pDstCol = taosArrayGet(pDst->pDataBlock, i);
|
SColumnInfoData* pDstCol = taosArrayGet(pDst->pDataBlock, i);
|
||||||
|
|
|
@ -1345,9 +1345,31 @@ static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
colDataSetVal(pColInfo, numOfRows, (const char *)streamName, false);
|
colDataSetVal(pColInfo, numOfRows, (const char *)streamName, false);
|
||||||
|
|
||||||
|
// create time
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
colDataSetVal(pColInfo, numOfRows, (const char *)&pStream->createTime, false);
|
colDataSetVal(pColInfo, numOfRows, (const char *)&pStream->createTime, false);
|
||||||
|
|
||||||
|
// stream id
|
||||||
|
char buf[128] = {0};
|
||||||
|
int32_t len = tintToHex(pStream->uid, &buf[4]);
|
||||||
|
buf[2] = '0';
|
||||||
|
buf[3] = 'x';
|
||||||
|
varDataSetLen(buf, len + 2);
|
||||||
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
|
colDataSetVal(pColInfo, numOfRows, buf, false);
|
||||||
|
|
||||||
|
// related fill-history stream id
|
||||||
|
memset(buf, 0, tListLen(buf));
|
||||||
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
|
if (pStream->hTaskUid != 0) {
|
||||||
|
len = tintToHex(pStream->hTaskUid, &buf[4]);
|
||||||
|
varDataSetLen(buf, len + 2);
|
||||||
|
colDataSetVal(pColInfo, numOfRows, buf, false);
|
||||||
|
} else {
|
||||||
|
colDataSetVal(pColInfo, numOfRows, buf, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
// related fill-history stream id
|
||||||
char sql[TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE] = {0};
|
char sql[TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||||
STR_WITH_MAXSIZE_TO_VARSTR(sql, pStream->sql, sizeof(sql));
|
STR_WITH_MAXSIZE_TO_VARSTR(sql, pStream->sql, sizeof(sql));
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
|
@ -1510,13 +1532,14 @@ static int32_t setTaskAttrInResBlock(SStreamObj *pStream, SStreamTask *pTask, SS
|
||||||
// pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
// pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
// colDataSetVal(pColInfo, numOfRows, (const char*)vbuf, false);
|
// colDataSetVal(pColInfo, numOfRows, (const char*)vbuf, false);
|
||||||
|
|
||||||
|
// info
|
||||||
if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
|
if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
|
||||||
const char *sinkStr = "%.2fMiB";
|
const char *sinkStr = "%.2fMiB";
|
||||||
sprintf(buf, sinkStr, pe->sinkDataSize);
|
sprintf(buf, sinkStr, pe->sinkDataSize);
|
||||||
} else if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
} else if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
||||||
// offset info
|
// offset info
|
||||||
const char *offsetStr = "%" PRId64 " [%" PRId64 ", %" PRId64 "]";
|
const char *offsetStr = "%" PRId64 " [%" PRId64 ", %" PRId64 "]";
|
||||||
sprintf(buf, offsetStr, pe->processedVer, pe->verStart, pe->verEnd);
|
sprintf(buf, offsetStr, pe->processedVer, pe->verRange.minVer, pe->verRange.maxVer);
|
||||||
}
|
}
|
||||||
|
|
||||||
STR_TO_VARSTR(vbuf, buf);
|
STR_TO_VARSTR(vbuf, buf);
|
||||||
|
@ -1524,6 +1547,55 @@ static int32_t setTaskAttrInResBlock(SStreamObj *pStream, SStreamTask *pTask, SS
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false);
|
colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false);
|
||||||
|
|
||||||
|
// start_time
|
||||||
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
|
colDataSetVal(pColInfo, numOfRows, (const char*)&pe->startTime, false);
|
||||||
|
|
||||||
|
// start id
|
||||||
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
|
colDataSetVal(pColInfo, numOfRows, (const char*)&pe->startCheckpointId, false);
|
||||||
|
|
||||||
|
// start ver
|
||||||
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
|
colDataSetVal(pColInfo, numOfRows, (const char*)&pe->startCheckpointVer, false);
|
||||||
|
|
||||||
|
// checkpoint time
|
||||||
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
|
if (pe->checkpointInfo.latestTime != 0) {
|
||||||
|
colDataSetVal(pColInfo, numOfRows, (const char *)&pe->checkpointInfo.latestTime, false);
|
||||||
|
} else {
|
||||||
|
colDataSetVal(pColInfo, numOfRows, 0, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
// checkpoint_id
|
||||||
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
|
colDataSetVal(pColInfo, numOfRows, (const char*)&pe->checkpointInfo.latestId, false);
|
||||||
|
|
||||||
|
// checkpoint info
|
||||||
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
|
colDataSetVal(pColInfo, numOfRows, (const char*)&pe->checkpointInfo.latestVer, false);
|
||||||
|
|
||||||
|
// ds_err_info
|
||||||
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
|
colDataSetVal(pColInfo, numOfRows, 0, true);
|
||||||
|
|
||||||
|
// history_task_id
|
||||||
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
|
if (pe->hTaskId != 0) {
|
||||||
|
memset(idstr, 0, tListLen(idstr));
|
||||||
|
len = tintToHex(pe->hTaskId, &idstr[4]);
|
||||||
|
idstr[2] = '0';
|
||||||
|
idstr[3] = 'x';
|
||||||
|
varDataSetLen(idstr, len + 2);
|
||||||
|
colDataSetVal(pColInfo, numOfRows, idstr, false);
|
||||||
|
} else {
|
||||||
|
colDataSetVal(pColInfo, numOfRows, 0, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
// history_task_status
|
||||||
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
|
colDataSetVal(pColInfo, numOfRows, 0, true);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -294,12 +294,14 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
|
||||||
}
|
}
|
||||||
|
|
||||||
streamTaskStatusCopy(pTaskEntry, p);
|
streamTaskStatusCopy(pTaskEntry, p);
|
||||||
if ((p->checkpointId != 0) && p->checkpointFailed) {
|
|
||||||
|
STaskCkptInfo *pChkInfo = &p->checkpointInfo;
|
||||||
|
if ((pChkInfo->activeId != 0) && pChkInfo->failed) {
|
||||||
mError("stream task:0x%" PRIx64 " checkpointId:%" PRIx64 " transId:%d failed, kill it", p->id.taskId,
|
mError("stream task:0x%" PRIx64 " checkpointId:%" PRIx64 " transId:%d failed, kill it", p->id.taskId,
|
||||||
p->checkpointId, p->chkpointTransId);
|
pChkInfo->activeId, pChkInfo->activeTransId);
|
||||||
|
|
||||||
SFailedCheckpointInfo info = {
|
SFailedCheckpointInfo info = {
|
||||||
.transId = p->chkpointTransId, .checkpointId = p->checkpointId, .streamUid = p->id.streamId};
|
.transId = pChkInfo->activeTransId, .checkpointId = pChkInfo->activeId, .streamUid = p->id.streamId};
|
||||||
addIntoCheckpointList(pFailedTasks, &info);
|
addIntoCheckpointList(pFailedTasks, &info);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -62,8 +62,8 @@ SRpcMsg buildHbReq() {
|
||||||
entry.id.taskId = 5;
|
entry.id.taskId = 5;
|
||||||
entry.id.streamId = defStreamId;
|
entry.id.streamId = defStreamId;
|
||||||
|
|
||||||
entry.checkpointId = 1;
|
entry.checkpointInfo.activeId = 1;
|
||||||
entry.checkpointFailed = true;
|
entry.checkpointInfo.failed = true;
|
||||||
|
|
||||||
taosArrayPush(msg.pTaskStatus, &entry);
|
taosArrayPush(msg.pTaskStatus, &entry);
|
||||||
}
|
}
|
||||||
|
|
|
@ -31,7 +31,6 @@ extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
struct SSnode {
|
struct SSnode {
|
||||||
char* path;
|
|
||||||
SStreamMeta* pMeta;
|
SStreamMeta* pMeta;
|
||||||
SMsgCb msgCb;
|
SMsgCb msgCb;
|
||||||
};
|
};
|
||||||
|
|
|
@ -32,6 +32,7 @@ static STaskId replaceStreamTaskId(SStreamTask *pTask) {
|
||||||
pTask->id.taskId = pTask->streamTaskId.taskId;
|
pTask->id.taskId = pTask->streamTaskId.taskId;
|
||||||
return id;
|
return id;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void restoreStreamTaskId(SStreamTask *pTask, STaskId *pId) {
|
static void restoreStreamTaskId(SStreamTask *pTask, STaskId *pId) {
|
||||||
ASSERT(pTask->info.fillHistory);
|
ASSERT(pTask->info.fillHistory);
|
||||||
pTask->id.taskId = pId->taskId;
|
pTask->id.taskId = pId->taskId;
|
||||||
|
@ -48,46 +49,23 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t nextProcessVer
|
||||||
|
|
||||||
streamTaskOpenAllUpstreamInput(pTask);
|
streamTaskOpenAllUpstreamInput(pTask);
|
||||||
|
|
||||||
STaskId taskId = {0};
|
code = tqExpandStreamTask(pTask, pSnode->pMeta, NULL);
|
||||||
if (pTask->info.fillHistory) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
taskId = replaceStreamTaskId(pTask);
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
pTask->pState = streamStateOpen(pSnode->path, pTask, false, -1, -1);
|
|
||||||
if (pTask->pState == NULL) {
|
|
||||||
sndError("s-task:%s failed to open state for task", pTask->id.idStr);
|
|
||||||
return -1;
|
|
||||||
} else {
|
|
||||||
sndDebug("s-task:%s state:%p", pTask->id.idStr, pTask->pState);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pTask->info.fillHistory) {
|
|
||||||
restoreStreamTaskId(pTask, &taskId);
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTask->upstreamInfo.pList);
|
|
||||||
SReadHandle handle = {
|
|
||||||
.checkpointId = pTask->chkInfo.checkpointId,
|
|
||||||
.vnode = NULL,
|
|
||||||
.numOfVgroups = numOfVgroups,
|
|
||||||
.pStateBackend = pTask->pState,
|
|
||||||
.fillHistory = pTask->info.fillHistory,
|
|
||||||
.winRange = pTask->dataRange.window,
|
|
||||||
};
|
|
||||||
initStreamStateAPI(&handle.api);
|
|
||||||
|
|
||||||
pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, SNODE_HANDLE, pTask->id.taskId);
|
|
||||||
ASSERT(pTask->exec.pExecutor);
|
|
||||||
qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId);
|
|
||||||
|
|
||||||
streamTaskResetUpstreamStageInfo(pTask);
|
streamTaskResetUpstreamStageInfo(pTask);
|
||||||
streamSetupScheduleTrigger(pTask);
|
streamSetupScheduleTrigger(pTask);
|
||||||
|
|
||||||
SCheckpointInfo *pChkInfo = &pTask->chkInfo;
|
SCheckpointInfo *pChkInfo = &pTask->chkInfo;
|
||||||
|
|
||||||
// checkpoint ver is the kept version, handled data should be the next version.
|
// checkpoint ver is the kept version, handled data should be the next version.
|
||||||
if (pChkInfo->checkpointId != 0) {
|
if (pChkInfo->checkpointId != 0) {
|
||||||
pChkInfo->nextProcessVer = pChkInfo->checkpointVer + 1;
|
pChkInfo->nextProcessVer = pChkInfo->checkpointVer + 1;
|
||||||
pChkInfo->processedVer = pChkInfo->checkpointVer;
|
pChkInfo->processedVer = pChkInfo->checkpointVer;
|
||||||
|
pTask->execInfo.startCheckpointVer = pChkInfo->nextProcessVer;
|
||||||
|
pTask->execInfo.startCheckpointId = pChkInfo->checkpointId;
|
||||||
|
|
||||||
sndInfo("s-task:%s restore from the checkpointId:%" PRId64 " ver:%" PRId64 " nextProcessVer:%" PRId64,
|
sndInfo("s-task:%s restore from the checkpointId:%" PRId64 " ver:%" PRId64 " nextProcessVer:%" PRId64,
|
||||||
pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer);
|
pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer);
|
||||||
}
|
}
|
||||||
|
@ -117,11 +95,6 @@ SSnode *sndOpen(const char *path, const SSnodeOpt *pOption) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
pSnode->path = taosStrdup(path);
|
|
||||||
if (pSnode->path == NULL) {
|
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
goto FAIL;
|
|
||||||
}
|
|
||||||
|
|
||||||
pSnode->msgCb = pOption->msgCb;
|
pSnode->msgCb = pOption->msgCb;
|
||||||
pSnode->pMeta = streamMetaOpen(path, pSnode, (FTaskExpand *)sndExpandTask, SNODE_HANDLE, taosGetTimestampMs(), tqStartTaskCompleteCallback);
|
pSnode->pMeta = streamMetaOpen(path, pSnode, (FTaskExpand *)sndExpandTask, SNODE_HANDLE, taosGetTimestampMs(), tqStartTaskCompleteCallback);
|
||||||
|
@ -140,7 +113,6 @@ SSnode *sndOpen(const char *path, const SSnodeOpt *pOption) {
|
||||||
return pSnode;
|
return pSnode;
|
||||||
|
|
||||||
FAIL:
|
FAIL:
|
||||||
taosMemoryFree(pSnode->path);
|
|
||||||
taosMemoryFree(pSnode);
|
taosMemoryFree(pSnode);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
@ -156,7 +128,6 @@ void sndClose(SSnode *pSnode) {
|
||||||
streamMetaNotifyClose(pSnode->pMeta);
|
streamMetaNotifyClose(pSnode->pMeta);
|
||||||
streamMetaCommit(pSnode->pMeta);
|
streamMetaCommit(pSnode->pMeta);
|
||||||
streamMetaClose(pSnode->pMeta);
|
streamMetaClose(pSnode->pMeta);
|
||||||
taosMemoryFree(pSnode->path);
|
|
||||||
taosMemoryFree(pSnode);
|
taosMemoryFree(pSnode);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -126,7 +126,7 @@ int32_t tsdbRowCompareWithoutVersion(const void *p1, const void *p2);
|
||||||
int32_t tsdbRowKeyCmpr(const STsdbRowKey *key1, const STsdbRowKey *key2);
|
int32_t tsdbRowKeyCmpr(const STsdbRowKey *key1, const STsdbRowKey *key2);
|
||||||
void tsdbRowGetKey(TSDBROW *row, STsdbRowKey *key);
|
void tsdbRowGetKey(TSDBROW *row, STsdbRowKey *key);
|
||||||
void tColRowGetKey(SBlockData *pBlock, int32_t irow, SRowKey *key);
|
void tColRowGetKey(SBlockData *pBlock, int32_t irow, SRowKey *key);
|
||||||
int32_t tRowKeyAssign(SRowKey *pDst, SRowKey *pSrc);
|
|
||||||
|
|
||||||
// STSDBRowIter
|
// STSDBRowIter
|
||||||
int32_t tsdbRowIterOpen(STSDBRowIter *pIter, TSDBROW *pRow, STSchema *pTSchema);
|
int32_t tsdbRowIterOpen(STSDBRowIter *pIter, TSDBROW *pRow, STSchema *pTSchema);
|
||||||
|
|
|
@ -711,22 +711,6 @@ end:
|
||||||
|
|
||||||
static void freePtr(void* ptr) { taosMemoryFree(*(void**)ptr); }
|
static void freePtr(void* ptr) { taosMemoryFree(*(void**)ptr); }
|
||||||
|
|
||||||
static STaskId replaceStreamTaskId(SStreamTask* pTask) {
|
|
||||||
ASSERT(pTask->info.fillHistory);
|
|
||||||
STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
|
|
||||||
|
|
||||||
pTask->id.streamId = pTask->streamTaskId.streamId;
|
|
||||||
pTask->id.taskId = pTask->streamTaskId.taskId;
|
|
||||||
|
|
||||||
return id;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void restoreStreamTaskId(SStreamTask* pTask, STaskId* pId) {
|
|
||||||
ASSERT(pTask->info.fillHistory);
|
|
||||||
pTask->id.taskId = pId->taskId;
|
|
||||||
pTask->id.streamId = pId->streamId;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) {
|
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) {
|
||||||
int32_t vgId = TD_VID(pTq->pVnode);
|
int32_t vgId = TD_VID(pTq->pVnode);
|
||||||
tqDebug("s-task:0x%x start to expand task", pTask->id.taskId);
|
tqDebug("s-task:0x%x start to expand task", pTask->id.taskId);
|
||||||
|
@ -736,74 +720,9 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
code = tqExpandStreamTask(pTask, pTq->pStreamMeta, pTq->pVnode);
|
||||||
STaskId taskId = {0};
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
if (pTask->info.fillHistory) {
|
return code;
|
||||||
taskId = replaceStreamTaskId(pTask);
|
|
||||||
}
|
|
||||||
|
|
||||||
pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1);
|
|
||||||
if (pTask->pState == NULL) {
|
|
||||||
tqError("s-task:%s (vgId:%d) failed to open state for task", pTask->id.idStr, vgId);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
tqDebug("s-task:%s state:%p", pTask->id.idStr, pTask->pState);
|
|
||||||
if (pTask->info.fillHistory) {
|
|
||||||
restoreStreamTaskId(pTask, &taskId);
|
|
||||||
}
|
|
||||||
|
|
||||||
SReadHandle handle = {
|
|
||||||
.checkpointId = pTask->chkInfo.checkpointId,
|
|
||||||
.vnode = pTq->pVnode,
|
|
||||||
.initTqReader = 1,
|
|
||||||
.pStateBackend = pTask->pState,
|
|
||||||
.fillHistory = pTask->info.fillHistory,
|
|
||||||
.winRange = pTask->dataRange.window,
|
|
||||||
};
|
|
||||||
|
|
||||||
initStorageAPI(&handle.api);
|
|
||||||
|
|
||||||
pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, vgId, pTask->id.taskId);
|
|
||||||
if (pTask->exec.pExecutor == NULL) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId);
|
|
||||||
} else if (pTask->info.taskLevel == TASK_LEVEL__AGG) {
|
|
||||||
STaskId taskId = {0};
|
|
||||||
if (pTask->info.fillHistory) {
|
|
||||||
taskId = replaceStreamTaskId(pTask);
|
|
||||||
}
|
|
||||||
|
|
||||||
pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1);
|
|
||||||
if (pTask->pState == NULL) {
|
|
||||||
tqError("s-task:%s (vgId:%d) failed to open state for task", pTask->id.idStr, vgId);
|
|
||||||
return -1;
|
|
||||||
} else {
|
|
||||||
tqDebug("s-task:%s state:%p", pTask->id.idStr, pTask->pState);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pTask->info.fillHistory) {
|
|
||||||
restoreStreamTaskId(pTask, &taskId);
|
|
||||||
}
|
|
||||||
|
|
||||||
SReadHandle handle = {
|
|
||||||
.checkpointId = pTask->chkInfo.checkpointId,
|
|
||||||
.vnode = NULL,
|
|
||||||
.numOfVgroups = (int32_t)taosArrayGetSize(pTask->upstreamInfo.pList),
|
|
||||||
.pStateBackend = pTask->pState,
|
|
||||||
.fillHistory = pTask->info.fillHistory,
|
|
||||||
.winRange = pTask->dataRange.window,
|
|
||||||
};
|
|
||||||
|
|
||||||
initStorageAPI(&handle.api);
|
|
||||||
|
|
||||||
pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, vgId, pTask->id.taskId);
|
|
||||||
if (pTask->exec.pExecutor == NULL) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// sink
|
// sink
|
||||||
|
@ -839,12 +758,15 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) {
|
||||||
|
|
||||||
streamTaskResetUpstreamStageInfo(pTask);
|
streamTaskResetUpstreamStageInfo(pTask);
|
||||||
streamSetupScheduleTrigger(pTask);
|
streamSetupScheduleTrigger(pTask);
|
||||||
|
|
||||||
SCheckpointInfo* pChkInfo = &pTask->chkInfo;
|
SCheckpointInfo* pChkInfo = &pTask->chkInfo;
|
||||||
|
|
||||||
// checkpoint ver is the kept version, handled data should be the next version.
|
// checkpoint ver is the kept version, handled data should be the next version.
|
||||||
if (pChkInfo->checkpointId != 0) {
|
if (pChkInfo->checkpointId != 0) {
|
||||||
pChkInfo->nextProcessVer = pChkInfo->checkpointVer + 1;
|
pChkInfo->nextProcessVer = pChkInfo->checkpointVer + 1;
|
||||||
pChkInfo->processedVer = pChkInfo->checkpointVer;
|
pChkInfo->processedVer = pChkInfo->checkpointVer;
|
||||||
|
pTask->execInfo.startCheckpointVer = pChkInfo->nextProcessVer;
|
||||||
|
pTask->execInfo.startCheckpointId = pChkInfo->checkpointId;
|
||||||
tqInfo("s-task:%s restore from the checkpointId:%" PRId64 " ver:%" PRId64 " currentVer:%" PRId64, pTask->id.idStr,
|
tqInfo("s-task:%s restore from the checkpointId:%" PRId64 " ver:%" PRId64 " currentVer:%" PRId64, pTask->id.idStr,
|
||||||
pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer);
|
pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer);
|
||||||
}
|
}
|
||||||
|
@ -890,33 +812,33 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
|
||||||
static void doStartFillhistoryStep2(SStreamTask* pTask, SStreamTask* pStreamTask, STQ* pTq) {
|
static void doStartFillhistoryStep2(SStreamTask* pTask, SStreamTask* pStreamTask, STQ* pTq) {
|
||||||
const char* id = pTask->id.idStr;
|
const char* id = pTask->id.idStr;
|
||||||
int64_t nextProcessedVer = pStreamTask->hTaskInfo.haltVer;
|
int64_t nextProcessedVer = pStreamTask->hTaskInfo.haltVer;
|
||||||
SVersionRange* pRange = &pTask->dataRange.range;
|
SVersionRange* pStep2Range = &pTask->step2Range;
|
||||||
|
|
||||||
// if it's an source task, extract the last version in wal.
|
// if it's an source task, extract the last version in wal.
|
||||||
bool done = streamHistoryTaskSetVerRangeStep2(pTask, nextProcessedVer);
|
bool done = streamHistoryTaskSetVerRangeStep2(pTask, nextProcessedVer);
|
||||||
pTask->execInfo.step2Start = taosGetTimestampMs();
|
pTask->execInfo.step2Start = taosGetTimestampMs();
|
||||||
|
|
||||||
if (done) {
|
if (done) {
|
||||||
qDebug("s-task:%s scan wal(step 2) verRange:%" PRId64 "-%" PRId64 " ended, elapsed time:%.2fs", id, pRange->minVer,
|
qDebug("s-task:%s scan wal(step 2) verRange:%" PRId64 "-%" PRId64 " ended, elapsed time:%.2fs", id, pStep2Range->minVer,
|
||||||
pRange->maxVer, 0.0);
|
pStep2Range->maxVer, 0.0);
|
||||||
streamTaskPutTranstateIntoInputQ(pTask);
|
streamTaskPutTranstateIntoInputQ(pTask);
|
||||||
streamExecTask(pTask); // exec directly
|
streamExecTask(pTask); // exec directly
|
||||||
} else {
|
} else {
|
||||||
STimeWindow* pWindow = &pTask->dataRange.window;
|
STimeWindow* pWindow = &pTask->dataRange.window;
|
||||||
tqDebug("s-task:%s level:%d verRange:%" PRId64 " - %" PRId64 " window:%" PRId64 "-%" PRId64
|
tqDebug("s-task:%s level:%d verRange:%" PRId64 "-%" PRId64 " window:%" PRId64 "-%" PRId64
|
||||||
", do secondary scan-history from WAL after halt the related stream task:%s",
|
", do secondary scan-history from WAL after halt the related stream task:%s",
|
||||||
id, pTask->info.taskLevel, pRange->minVer, pRange->maxVer, pWindow->skey, pWindow->ekey,
|
id, pTask->info.taskLevel, pStep2Range->minVer, pStep2Range->maxVer, pWindow->skey, pWindow->ekey,
|
||||||
pStreamTask->id.idStr);
|
pStreamTask->id.idStr);
|
||||||
ASSERT(pTask->status.schedStatus == TASK_SCHED_STATUS__WAITING);
|
ASSERT(pTask->status.schedStatus == TASK_SCHED_STATUS__WAITING);
|
||||||
|
|
||||||
streamSetParamForStreamScannerStep2(pTask, pRange, pWindow);
|
streamSetParamForStreamScannerStep2(pTask, pStep2Range, pWindow);
|
||||||
|
|
||||||
int64_t dstVer = pTask->dataRange.range.minVer;
|
int64_t dstVer =pStep2Range->minVer;
|
||||||
pTask->chkInfo.nextProcessVer = dstVer;
|
pTask->chkInfo.nextProcessVer = dstVer;
|
||||||
|
|
||||||
walReaderSetSkipToVersion(pTask->exec.pWalReader, dstVer);
|
walReaderSetSkipToVersion(pTask->exec.pWalReader, dstVer);
|
||||||
tqDebug("s-task:%s wal reader start scan WAL verRange:%" PRId64 "-%" PRId64 ", set sched-status:%d", id, dstVer,
|
tqDebug("s-task:%s wal reader start scan WAL verRange:%" PRId64 "-%" PRId64 ", set sched-status:%d", id, dstVer,
|
||||||
pTask->dataRange.range.maxVer, TASK_SCHED_STATUS__INACTIVE);
|
pStep2Range->maxVer, TASK_SCHED_STATUS__INACTIVE);
|
||||||
|
|
||||||
/*int8_t status = */ streamTaskSetSchedStatusInactive(pTask);
|
/*int8_t status = */ streamTaskSetSchedStatusInactive(pTask);
|
||||||
|
|
||||||
|
|
|
@ -242,21 +242,23 @@ int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId) {
|
||||||
// todo handle memory error
|
// todo handle memory error
|
||||||
bool handleFillhistoryScanComplete(SStreamTask* pTask, int64_t ver) {
|
bool handleFillhistoryScanComplete(SStreamTask* pTask, int64_t ver) {
|
||||||
const char* id = pTask->id.idStr;
|
const char* id = pTask->id.idStr;
|
||||||
int64_t maxVer = pTask->dataRange.range.maxVer;
|
int64_t maxVer = pTask->step2Range.maxVer;
|
||||||
|
|
||||||
if ((pTask->info.fillHistory == 1) && ver > pTask->dataRange.range.maxVer) {
|
if ((pTask->info.fillHistory == 1) && ver > maxVer) {
|
||||||
if (!pTask->status.appendTranstateBlock) {
|
if (!pTask->status.appendTranstateBlock) {
|
||||||
qWarn("s-task:%s fill-history scan WAL, nextProcessVer:%" PRId64 " out of the maximum ver:%" PRId64
|
qWarn("s-task:%s fill-history scan WAL, nextProcessVer:%" PRId64 " out of the maximum ver:%" PRId64
|
||||||
", not scan wal anymore, add transfer-state block into inputQ",
|
", not scan wal anymore, add transfer-state block into inputQ",
|
||||||
id, ver, maxVer);
|
id, ver, maxVer);
|
||||||
|
|
||||||
double el = (taosGetTimestampMs() - pTask->execInfo.step2Start) / 1000.0;
|
double el = (taosGetTimestampMs() - pTask->execInfo.step2Start) / 1000.0;
|
||||||
qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, el);
|
qDebug("s-task:%s scan-history from WAL stage(step 2) ended, range:%" PRId64 "-%" PRId64 ", elapsed time:%.2fs",
|
||||||
|
id, pTask->step2Range.minVer, maxVer, el);
|
||||||
/*int32_t code = */streamTaskPutTranstateIntoInputQ(pTask);
|
/*int32_t code = */streamTaskPutTranstateIntoInputQ(pTask);
|
||||||
return true;
|
return true;
|
||||||
} else {
|
} else {
|
||||||
qWarn("s-task:%s fill-history scan WAL, nextProcessVer:%" PRId64 " out of the maximum ver:%" PRId64 ", not scan wal",
|
qWarn("s-task:%s fill-history scan WAL, nextProcessVer:%" PRId64 " out of the ver range:%" PRId64 "-%" PRId64
|
||||||
id, ver, maxVer);
|
", not scan wal",
|
||||||
|
id, ver, pTask->step2Range.minVer, maxVer);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -389,7 +391,7 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t numOfItems = streamQueueGetNumOfItems(pTask->inputq.queue);
|
int32_t numOfItems = streamQueueGetNumOfItems(pTask->inputq.queue);
|
||||||
int64_t maxVer = (pTask->info.fillHistory == 1) ? pTask->dataRange.range.maxVer : INT64_MAX;
|
int64_t maxVer = (pTask->info.fillHistory == 1) ? pTask->step2Range.maxVer : INT64_MAX;
|
||||||
|
|
||||||
taosThreadMutexLock(&pTask->lock);
|
taosThreadMutexLock(&pTask->lock);
|
||||||
|
|
||||||
|
|
|
@ -23,6 +23,69 @@ typedef struct STaskUpdateEntry {
|
||||||
int32_t transId;
|
int32_t transId;
|
||||||
} STaskUpdateEntry;
|
} STaskUpdateEntry;
|
||||||
|
|
||||||
|
static STaskId replaceStreamTaskId(SStreamTask* pTask) {
|
||||||
|
ASSERT(pTask->info.fillHistory);
|
||||||
|
STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
|
||||||
|
|
||||||
|
pTask->id.streamId = pTask->streamTaskId.streamId;
|
||||||
|
pTask->id.taskId = pTask->streamTaskId.taskId;
|
||||||
|
|
||||||
|
return id;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void restoreStreamTaskId(SStreamTask* pTask, STaskId* pId) {
|
||||||
|
ASSERT(pTask->info.fillHistory);
|
||||||
|
pTask->id.taskId = pId->taskId;
|
||||||
|
pTask->id.streamId = pId->streamId;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tqExpandStreamTask(SStreamTask* pTask, SStreamMeta* pMeta, void* pVnode) {
|
||||||
|
int32_t vgId = pMeta->vgId;
|
||||||
|
STaskId taskId = {0};
|
||||||
|
|
||||||
|
if (pTask->info.fillHistory) {
|
||||||
|
taskId = replaceStreamTaskId(pTask);
|
||||||
|
}
|
||||||
|
|
||||||
|
pTask->pState = streamStateOpen(pMeta->path, pTask, false, -1, -1);
|
||||||
|
if (pTask->pState == NULL) {
|
||||||
|
tqError("s-task:%s (vgId:%d) failed to open state for task, expand task failed", pTask->id.idStr, vgId);
|
||||||
|
return -1;
|
||||||
|
} else {
|
||||||
|
tqDebug("s-task:%s state:%p", pTask->id.idStr, pTask->pState);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pTask->info.fillHistory) {
|
||||||
|
restoreStreamTaskId(pTask, &taskId);
|
||||||
|
}
|
||||||
|
|
||||||
|
SReadHandle handle = {
|
||||||
|
.checkpointId = pTask->chkInfo.checkpointId,
|
||||||
|
.pStateBackend = pTask->pState,
|
||||||
|
.fillHistory = pTask->info.fillHistory,
|
||||||
|
.winRange = pTask->dataRange.window,
|
||||||
|
};
|
||||||
|
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
||||||
|
handle.vnode = pVnode;
|
||||||
|
handle.initTqReader = 1;
|
||||||
|
} else if (pTask->info.taskLevel == TASK_LEVEL__AGG) {
|
||||||
|
handle.numOfVgroups = (int32_t)taosArrayGetSize(pTask->upstreamInfo.pList);
|
||||||
|
}
|
||||||
|
|
||||||
|
initStorageAPI(&handle.api);
|
||||||
|
|
||||||
|
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE || pTask->info.taskLevel == TASK_LEVEL__AGG) {
|
||||||
|
pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, vgId, pTask->id.taskId);
|
||||||
|
if (pTask->exec.pExecutor == NULL) {
|
||||||
|
tqError("s-task:%s failed to create exec taskInfo, failed to expand task", pTask->id.idStr);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId);
|
||||||
|
}
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t tqStreamTaskStartAsync(SStreamMeta* pMeta, SMsgCb* cb, bool restart) {
|
int32_t tqStreamTaskStartAsync(SStreamMeta* pMeta, SMsgCb* cb, bool restart) {
|
||||||
int32_t vgId = pMeta->vgId;
|
int32_t vgId = pMeta->vgId;
|
||||||
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
|
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
|
||||||
|
|
|
@ -37,7 +37,6 @@ typedef struct {
|
||||||
int64_t cid;
|
int64_t cid;
|
||||||
int64_t now;
|
int64_t now;
|
||||||
TSKEY nextKey;
|
TSKEY nextKey;
|
||||||
TSKEY maxDelKey;
|
|
||||||
int32_t fid;
|
int32_t fid;
|
||||||
int32_t expLevel;
|
int32_t expLevel;
|
||||||
SDiskID did;
|
SDiskID did;
|
||||||
|
@ -161,15 +160,13 @@ static int32_t tsdbCommitTombData(SCommitter2 *committer) {
|
||||||
int64_t numRecord = 0;
|
int64_t numRecord = 0;
|
||||||
SMetaInfo info;
|
SMetaInfo info;
|
||||||
|
|
||||||
if (committer->ctx->fset == NULL && !committer->ctx->hasTSData) {
|
if (committer->tsdb->imem->nDel == 0) {
|
||||||
if (committer->ctx->maxKey < committer->ctx->maxDelKey) {
|
goto _exit;
|
||||||
committer->ctx->nextKey = committer->ctx->maxKey + 1;
|
|
||||||
} else {
|
|
||||||
committer->ctx->nextKey = TSKEY_MAX;
|
|
||||||
}
|
|
||||||
return 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// do not need to write tomb data if there is no ts data
|
||||||
|
bool skip = (committer->ctx->fset == NULL && !committer->ctx->hasTSData);
|
||||||
|
|
||||||
committer->ctx->tbid->suid = 0;
|
committer->ctx->tbid->suid = 0;
|
||||||
committer->ctx->tbid->uid = 0;
|
committer->ctx->tbid->uid = 0;
|
||||||
for (STombRecord *record; (record = tsdbIterMergerGetTombRecord(committer->tombIterMerger));) {
|
for (STombRecord *record; (record = tsdbIterMergerGetTombRecord(committer->tombIterMerger));) {
|
||||||
|
@ -196,9 +193,11 @@ static int32_t tsdbCommitTombData(SCommitter2 *committer) {
|
||||||
record->skey = TMAX(record->skey, committer->ctx->minKey);
|
record->skey = TMAX(record->skey, committer->ctx->minKey);
|
||||||
record->ekey = TMIN(record->ekey, committer->ctx->maxKey);
|
record->ekey = TMIN(record->ekey, committer->ctx->maxKey);
|
||||||
|
|
||||||
numRecord++;
|
if (!skip) {
|
||||||
code = tsdbFSetWriteTombRecord(committer->writer, record);
|
numRecord++;
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
code = tsdbFSetWriteTombRecord(committer->writer, record);
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
code = tsdbIterMergerNext(committer->tombIterMerger);
|
code = tsdbIterMergerNext(committer->tombIterMerger);
|
||||||
|
@ -480,28 +479,11 @@ static int32_t tsdbOpenCommitter(STsdb *tsdb, SCommitInfo *info, SCommitter2 *co
|
||||||
STbData *tbData = TCONTAINER_OF(node, STbData, rbtn);
|
STbData *tbData = TCONTAINER_OF(node, STbData, rbtn);
|
||||||
|
|
||||||
for (SDelData *delData = tbData->pHead; delData; delData = delData->pNext) {
|
for (SDelData *delData = tbData->pHead; delData; delData = delData->pNext) {
|
||||||
if (delData->sKey < committer->ctx->nextKey) {
|
committer->ctx->nextKey = TMIN(committer->ctx->nextKey, delData->sKey);
|
||||||
committer->ctx->nextKey = delData->sKey;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
committer->ctx->maxDelKey = TSKEY_MIN;
|
|
||||||
TSKEY minKey = TSKEY_MAX;
|
|
||||||
TSKEY maxKey = TSKEY_MIN;
|
|
||||||
if (TARRAY2_SIZE(committer->fsetArr) > 0) {
|
|
||||||
STFileSet *fset = TARRAY2_LAST(committer->fsetArr);
|
|
||||||
tsdbFidKeyRange(fset->fid, committer->minutes, committer->precision, &minKey, &committer->ctx->maxDelKey);
|
|
||||||
|
|
||||||
fset = TARRAY2_FIRST(committer->fsetArr);
|
|
||||||
tsdbFidKeyRange(fset->fid, committer->minutes, committer->precision, &minKey, &maxKey);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (committer->ctx->nextKey < TMIN(tsdb->imem->minKey, minKey)) {
|
|
||||||
committer->ctx->nextKey = TMIN(tsdb->imem->minKey, minKey);
|
|
||||||
}
|
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
if (code) {
|
if (code) {
|
||||||
TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code);
|
TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code);
|
||||||
|
|
|
@ -124,7 +124,7 @@ int32_t pkCompEx(SRowKey* p1, SRowKey* p2) {
|
||||||
if (p1->pks[0].val == p2->pks[0].val) {
|
if (p1->pks[0].val == p2->pks[0].val) {
|
||||||
return 0;
|
return 0;
|
||||||
} else {
|
} else {
|
||||||
return p1->pks[0].val > p2->pks[0].val? 1:-1;
|
return tValueCompare(&p1->pks[0], &p2->pks[0]);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -5037,7 +5037,13 @@ int32_t tsdbReaderReset2(STsdbReader* pReader, SQueryTableDataCond* pCond) {
|
||||||
|
|
||||||
bool asc = ASCENDING_TRAVERSE(pReader->info.order);
|
bool asc = ASCENDING_TRAVERSE(pReader->info.order);
|
||||||
int32_t step = asc ? 1 : -1;
|
int32_t step = asc ? 1 : -1;
|
||||||
int64_t ts = asc ? pReader->info.window.skey - 1 : pReader->info.window.ekey + 1;
|
|
||||||
|
int64_t ts = 0;
|
||||||
|
if (asc) {
|
||||||
|
ts = (pReader->info.window.skey > INT64_MIN)? pReader->info.window.skey-1:pReader->info.window.skey;
|
||||||
|
} else {
|
||||||
|
ts = (pReader->info.window.ekey < INT64_MAX)? pReader->info.window.ekey + 1:pReader->info.window.ekey;
|
||||||
|
}
|
||||||
resetAllDataBlockScanInfo(pStatus->pTableMap, ts, step);
|
resetAllDataBlockScanInfo(pStatus->pTableMap, ts, step);
|
||||||
|
|
||||||
// no data in files, let's try buffer in memory
|
// no data in files, let's try buffer in memory
|
||||||
|
|
|
@ -136,20 +136,51 @@ int32_t initRowKey(SRowKey* pKey, int64_t ts, int32_t numOfPks, int32_t type, in
|
||||||
|
|
||||||
if (numOfPks > 0) {
|
if (numOfPks > 0) {
|
||||||
pKey->pks[0].type = type;
|
pKey->pks[0].type = type;
|
||||||
if (IS_NUMERIC_TYPE(pKey->pks[0].type)) {
|
|
||||||
|
if (IS_NUMERIC_TYPE(type)) {
|
||||||
if (asc) {
|
if (asc) {
|
||||||
switch(pKey->pks[0].type) {
|
switch(type) {
|
||||||
case TSDB_DATA_TYPE_BIGINT:pKey->pks[0].val = INT64_MIN;break;
|
case TSDB_DATA_TYPE_BIGINT: {
|
||||||
case TSDB_DATA_TYPE_INT:pKey->pks[0].val = INT32_MIN;break;
|
pKey->pks[0].val = INT64_MIN;
|
||||||
case TSDB_DATA_TYPE_SMALLINT:pKey->pks[0].val = INT16_MIN;break;
|
break;
|
||||||
case TSDB_DATA_TYPE_TINYINT:pKey->pks[0].val = INT8_MIN;break;
|
}
|
||||||
|
case TSDB_DATA_TYPE_INT:{
|
||||||
|
int32_t min = INT32_MIN;
|
||||||
|
memcpy(&pKey->pks[0].val, &min, tDataTypes[type].bytes);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case TSDB_DATA_TYPE_SMALLINT:{
|
||||||
|
int16_t min = INT16_MIN;
|
||||||
|
memcpy(&pKey->pks[0].val, &min, tDataTypes[type].bytes);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case TSDB_DATA_TYPE_TINYINT:{
|
||||||
|
int8_t min = INT8_MIN;
|
||||||
|
memcpy(&pKey->pks[0].val, &min, tDataTypes[type].bytes);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case TSDB_DATA_TYPE_UTINYINT:
|
||||||
|
case TSDB_DATA_TYPE_USMALLINT:
|
||||||
|
case TSDB_DATA_TYPE_UINT:
|
||||||
|
case TSDB_DATA_TYPE_UBIGINT: {
|
||||||
|
pKey->pks[0].val = 0;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
ASSERT(0);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
switch(pKey->pks[0].type) {
|
switch(type) {
|
||||||
case TSDB_DATA_TYPE_BIGINT:pKey->pks[0].val = INT64_MAX;break;
|
case TSDB_DATA_TYPE_BIGINT:pKey->pks[0].val = INT64_MAX;break;
|
||||||
case TSDB_DATA_TYPE_INT:pKey->pks[0].val = INT32_MAX;break;
|
case TSDB_DATA_TYPE_INT:pKey->pks[0].val = INT32_MAX;break;
|
||||||
case TSDB_DATA_TYPE_SMALLINT:pKey->pks[0].val = INT16_MAX;break;
|
case TSDB_DATA_TYPE_SMALLINT:pKey->pks[0].val = INT16_MAX;break;
|
||||||
case TSDB_DATA_TYPE_TINYINT:pKey->pks[0].val = INT8_MAX;break;
|
case TSDB_DATA_TYPE_TINYINT:pKey->pks[0].val = INT8_MAX;break;
|
||||||
|
case TSDB_DATA_TYPE_UBIGINT:pKey->pks[0].val = UINT64_MAX;break;
|
||||||
|
case TSDB_DATA_TYPE_UINT:pKey->pks[0].val = UINT32_MAX;break;
|
||||||
|
case TSDB_DATA_TYPE_USMALLINT:pKey->pks[0].val = UINT16_MAX;break;
|
||||||
|
case TSDB_DATA_TYPE_UTINYINT:pKey->pks[0].val = UINT8_MAX;break;
|
||||||
|
default:
|
||||||
|
ASSERT(0);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -2093,12 +2093,14 @@ static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t ver, void *pReq, in
|
||||||
tDecoderInit(pCoder, pReq, len);
|
tDecoderInit(pCoder, pReq, len);
|
||||||
tDecodeDeleteRes(pCoder, pRes);
|
tDecodeDeleteRes(pCoder, pRes);
|
||||||
|
|
||||||
for (int32_t iUid = 0; iUid < taosArrayGetSize(pRes->uidList); iUid++) {
|
if (pRes->affectedRows > 0) {
|
||||||
uint64_t uid = *(uint64_t *)taosArrayGet(pRes->uidList, iUid);
|
for (int32_t iUid = 0; iUid < taosArrayGetSize(pRes->uidList); iUid++) {
|
||||||
code = tsdbDeleteTableData(pVnode->pTsdb, ver, pRes->suid, uid, pRes->skey, pRes->ekey);
|
uint64_t uid = *(uint64_t *)taosArrayGet(pRes->uidList, iUid);
|
||||||
if (code) goto _err;
|
code = tsdbDeleteTableData(pVnode->pTsdb, ver, pRes->suid, uid, pRes->skey, pRes->ekey);
|
||||||
code = metaUpdateChangeTimeWithLock(pVnode->pMeta, uid, pRes->ctimeMs);
|
if (code) goto _err;
|
||||||
if (code) goto _err;
|
code = metaUpdateChangeTimeWithLock(pVnode->pMeta, uid, pRes->ctimeMs);
|
||||||
|
if (code) goto _err;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
code = tdProcessRSmaDelete(pVnode->pSma, ver, pRes, pReq, len);
|
code = tdProcessRSmaDelete(pVnode->pSma, ver, pRes, pReq, len);
|
||||||
|
@ -2296,5 +2298,5 @@ _OVER:
|
||||||
int32_t vnodeProcessCompactVnodeReqImpl(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
int32_t vnodeProcessCompactVnodeReqImpl(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
int32_t tsdbAsyncCompact(STsdb *tsdb, const STimeWindow *tw, bool sync);
|
int32_t tsdbAsyncCompact(STsdb *tsdb, const STimeWindow *tw, bool sync) { return 0; }
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -926,8 +926,8 @@ int32_t qStreamSourceScanParamForHistoryScanStep2(qTaskInfo_t tinfo, SVersionRan
|
||||||
pStreamInfo->fillHistoryWindow = *pWindow;
|
pStreamInfo->fillHistoryWindow = *pWindow;
|
||||||
pStreamInfo->recoverStep = STREAM_RECOVER_STEP__PREPARE2;
|
pStreamInfo->recoverStep = STREAM_RECOVER_STEP__PREPARE2;
|
||||||
|
|
||||||
qDebug("%s step 2. set param for stream scanner scan wal, verRange:%" PRId64 " - %" PRId64 ", window:%" PRId64
|
qDebug("%s step 2. set param for stream scanner scan wal, verRange:%" PRId64 "-%" PRId64 ", window:%" PRId64
|
||||||
" - %" PRId64,
|
"-%" PRId64,
|
||||||
GET_TASKID(pTaskInfo), pStreamInfo->fillHistoryVer.minVer, pStreamInfo->fillHistoryVer.maxVer, pWindow->skey,
|
GET_TASKID(pTaskInfo), pStreamInfo->fillHistoryVer.minVer, pStreamInfo->fillHistoryVer.maxVer, pWindow->skey,
|
||||||
pWindow->ekey);
|
pWindow->ekey);
|
||||||
return 0;
|
return 0;
|
||||||
|
|
|
@ -3940,15 +3940,9 @@ static int32_t lastRowScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogic
|
||||||
if (FUNCTION_TYPE_LAST == funcType) {
|
if (FUNCTION_TYPE_LAST == funcType) {
|
||||||
nodesWalkExpr(nodesListGetNode(pFunc->pParameterList, 0), lastRowScanOptSetColDataType, &cxt);
|
nodesWalkExpr(nodesListGetNode(pFunc->pParameterList, 0), lastRowScanOptSetColDataType, &cxt);
|
||||||
nodesListErase(pFunc->pParameterList, nodesListGetCell(pFunc->pParameterList, 1));
|
nodesListErase(pFunc->pParameterList, nodesListGetCell(pFunc->pParameterList, 1));
|
||||||
if (pFunc->hasPk) {
|
}
|
||||||
if (LIST_LENGTH(pFunc->pParameterList) != 2) {
|
if (pFunc->hasPk) {
|
||||||
planError("last func which has pk but its parameter list length is not %d", 2);
|
nodesListMakeAppend(&cxt.pOtherCols, nodesListGetNode(pFunc->pParameterList, LIST_LENGTH(pFunc->pParameterList) - 1));
|
||||||
nodesClearList(cxt.pLastCols);
|
|
||||||
taosArrayDestroy(isDuplicateCol);
|
|
||||||
return TSDB_CODE_PLAN_INTERNAL_ERROR;
|
|
||||||
}
|
|
||||||
nodesWalkExpr(nodesListGetNode(pFunc->pParameterList, 1), lastRowScanOptSetColDataType, &cxt);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
pNode = nodesListGetNode(pFunc->pParameterList, 0);
|
pNode = nodesListGetNode(pFunc->pParameterList, 0);
|
||||||
|
|
|
@ -309,6 +309,7 @@ int32_t streamSaveTaskCheckpointInfo(SStreamTask* p, int64_t checkpointId) {
|
||||||
|
|
||||||
pCKInfo->checkpointId = pCKInfo->checkpointingId;
|
pCKInfo->checkpointId = pCKInfo->checkpointingId;
|
||||||
pCKInfo->checkpointVer = pCKInfo->processedVer;
|
pCKInfo->checkpointVer = pCKInfo->processedVer;
|
||||||
|
pCKInfo->checkpointTime = pCKInfo->startTs;
|
||||||
|
|
||||||
streamTaskClearCheckInfo(p, false);
|
streamTaskClearCheckInfo(p, false);
|
||||||
taosThreadMutexUnlock(&p->lock);
|
taosThreadMutexUnlock(&p->lock);
|
||||||
|
|
|
@ -591,19 +591,16 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pMeta->expandFunc(pMeta->ahandle, pTask, ver) < 0) {
|
if (pMeta->expandFunc(pMeta->ahandle, pTask, ver) < 0) {
|
||||||
tFreeStreamTask(pTask);
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
taosArrayPush(pMeta->pTaskList, &pTask->id);
|
taosArrayPush(pMeta->pTaskList, &pTask->id);
|
||||||
|
|
||||||
if (streamMetaSaveTask(pMeta, pTask) < 0) {
|
if (streamMetaSaveTask(pMeta, pTask) < 0) {
|
||||||
tFreeStreamTask(pTask);
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (streamMetaCommit(pMeta) < 0) {
|
if (streamMetaCommit(pMeta) < 0) {
|
||||||
tFreeStreamTask(pTask);
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -960,11 +957,18 @@ int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) {
|
||||||
if (tEncodeDouble(pEncoder, ps->sinkQuota) < 0) return -1;
|
if (tEncodeDouble(pEncoder, ps->sinkQuota) < 0) return -1;
|
||||||
if (tEncodeDouble(pEncoder, ps->sinkDataSize) < 0) return -1;
|
if (tEncodeDouble(pEncoder, ps->sinkDataSize) < 0) return -1;
|
||||||
if (tEncodeI64(pEncoder, ps->processedVer) < 0) return -1;
|
if (tEncodeI64(pEncoder, ps->processedVer) < 0) return -1;
|
||||||
if (tEncodeI64(pEncoder, ps->verStart) < 0) return -1;
|
if (tEncodeI64(pEncoder, ps->verRange.minVer) < 0) return -1;
|
||||||
if (tEncodeI64(pEncoder, ps->verEnd) < 0) return -1;
|
if (tEncodeI64(pEncoder, ps->verRange.maxVer) < 0) return -1;
|
||||||
if (tEncodeI64(pEncoder, ps->checkpointId) < 0) return -1;
|
if (tEncodeI64(pEncoder, ps->checkpointInfo.activeId) < 0) return -1;
|
||||||
if (tEncodeI8(pEncoder, ps->checkpointFailed) < 0) return -1;
|
if (tEncodeI8(pEncoder, ps->checkpointInfo.failed) < 0) return -1;
|
||||||
if (tEncodeI32(pEncoder, ps->chkpointTransId) < 0) return -1;
|
if (tEncodeI32(pEncoder, ps->checkpointInfo.activeTransId) < 0) return -1;
|
||||||
|
if (tEncodeI64(pEncoder, ps->checkpointInfo.latestId) < 0) return -1;
|
||||||
|
if (tEncodeI64(pEncoder, ps->checkpointInfo.latestVer) < 0) return -1;
|
||||||
|
if (tEncodeI64(pEncoder, ps->checkpointInfo.latestTime) < 0) return -1;
|
||||||
|
if (tEncodeI64(pEncoder, ps->startTime) < 0) return -1;
|
||||||
|
if (tEncodeI64(pEncoder, ps->startCheckpointId) < 0) return -1;
|
||||||
|
if (tEncodeI64(pEncoder, ps->startCheckpointVer) < 0) return -1;
|
||||||
|
if (tEncodeI64(pEncoder, ps->hTaskId) < 0) return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t numOfVgs = taosArrayGetSize(pReq->pUpdateNodes);
|
int32_t numOfVgs = taosArrayGetSize(pReq->pUpdateNodes);
|
||||||
|
@ -999,11 +1003,19 @@ int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) {
|
||||||
if (tDecodeDouble(pDecoder, &entry.sinkQuota) < 0) return -1;
|
if (tDecodeDouble(pDecoder, &entry.sinkQuota) < 0) return -1;
|
||||||
if (tDecodeDouble(pDecoder, &entry.sinkDataSize) < 0) return -1;
|
if (tDecodeDouble(pDecoder, &entry.sinkDataSize) < 0) return -1;
|
||||||
if (tDecodeI64(pDecoder, &entry.processedVer) < 0) return -1;
|
if (tDecodeI64(pDecoder, &entry.processedVer) < 0) return -1;
|
||||||
if (tDecodeI64(pDecoder, &entry.verStart) < 0) return -1;
|
if (tDecodeI64(pDecoder, &entry.verRange.minVer) < 0) return -1;
|
||||||
if (tDecodeI64(pDecoder, &entry.verEnd) < 0) return -1;
|
if (tDecodeI64(pDecoder, &entry.verRange.maxVer) < 0) return -1;
|
||||||
if (tDecodeI64(pDecoder, &entry.checkpointId) < 0) return -1;
|
if (tDecodeI64(pDecoder, &entry.checkpointInfo.activeId) < 0) return -1;
|
||||||
if (tDecodeI8(pDecoder, &entry.checkpointFailed) < 0) return -1;
|
if (tDecodeI8(pDecoder, &entry.checkpointInfo.failed) < 0) return -1;
|
||||||
if (tDecodeI32(pDecoder, &entry.chkpointTransId) < 0) return -1;
|
if (tDecodeI32(pDecoder, &entry.checkpointInfo.activeTransId) < 0) return -1;
|
||||||
|
|
||||||
|
if (tDecodeI64(pDecoder, &entry.checkpointInfo.latestId) < 0) return -1;
|
||||||
|
if (tDecodeI64(pDecoder, &entry.checkpointInfo.latestVer) < 0) return -1;
|
||||||
|
if (tDecodeI64(pDecoder, &entry.checkpointInfo.latestTime) < 0) return -1;
|
||||||
|
if (tDecodeI64(pDecoder, &entry.startTime) < 0) return -1;
|
||||||
|
if (tDecodeI64(pDecoder, &entry.startCheckpointId) < 0) return -1;
|
||||||
|
if (tDecodeI64(pDecoder, &entry.startCheckpointVer) < 0) return -1;
|
||||||
|
if (tDecodeI64(pDecoder, &entry.hTaskId) < 0) return -1;
|
||||||
|
|
||||||
entry.id.taskId = taskId;
|
entry.id.taskId = taskId;
|
||||||
taosArrayPush(pReq->pTaskStatus, &entry);
|
taosArrayPush(pReq->pTaskStatus, &entry);
|
||||||
|
@ -1105,7 +1117,16 @@ static int32_t metaHeartbeatToMnodeImpl(SStreamMeta* pMeta) {
|
||||||
.status = streamTaskGetStatus(*pTask)->state,
|
.status = streamTaskGetStatus(*pTask)->state,
|
||||||
.nodeId = hbMsg.vgId,
|
.nodeId = hbMsg.vgId,
|
||||||
.stage = pMeta->stage,
|
.stage = pMeta->stage,
|
||||||
|
|
||||||
.inputQUsed = SIZE_IN_MiB(streamQueueGetItemSize((*pTask)->inputq.queue)),
|
.inputQUsed = SIZE_IN_MiB(streamQueueGetItemSize((*pTask)->inputq.queue)),
|
||||||
|
.startTime = (*pTask)->execInfo.start,
|
||||||
|
.checkpointInfo.latestId = (*pTask)->chkInfo.checkpointId,
|
||||||
|
.checkpointInfo.latestVer = (*pTask)->chkInfo.checkpointVer,
|
||||||
|
.checkpointInfo.latestTime = (*pTask)->chkInfo.checkpointTime,
|
||||||
|
.hTaskId = (*pTask)->hTaskInfo.id.taskId,
|
||||||
|
|
||||||
|
.startCheckpointId = (*pTask)->execInfo.startCheckpointId,
|
||||||
|
.startCheckpointVer = (*pTask)->execInfo.startCheckpointVer,
|
||||||
};
|
};
|
||||||
|
|
||||||
entry.inputRate = entry.inputQUsed * 100.0 / (2 * STREAM_TASK_QUEUE_CAPACITY_IN_SIZE);
|
entry.inputRate = entry.inputQUsed * 100.0 / (2 * STREAM_TASK_QUEUE_CAPACITY_IN_SIZE);
|
||||||
|
@ -1115,11 +1136,11 @@ static int32_t metaHeartbeatToMnodeImpl(SStreamMeta* pMeta) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if ((*pTask)->chkInfo.checkpointingId != 0) {
|
if ((*pTask)->chkInfo.checkpointingId != 0) {
|
||||||
entry.checkpointFailed = ((*pTask)->chkInfo.failedId >= (*pTask)->chkInfo.checkpointingId) ? 1 : 0;
|
entry.checkpointInfo.failed = ((*pTask)->chkInfo.failedId >= (*pTask)->chkInfo.checkpointingId) ? 1 : 0;
|
||||||
entry.checkpointId = (*pTask)->chkInfo.checkpointingId;
|
entry.checkpointInfo.activeId = (*pTask)->chkInfo.checkpointingId;
|
||||||
entry.chkpointTransId = (*pTask)->chkInfo.transId;
|
entry.checkpointInfo.activeTransId = (*pTask)->chkInfo.transId;
|
||||||
|
|
||||||
if (entry.checkpointFailed) {
|
if (entry.checkpointInfo.failed) {
|
||||||
stInfo("s-task:%s send kill checkpoint trans info, transId:%d", (*pTask)->id.idStr, (*pTask)->chkInfo.transId);
|
stInfo("s-task:%s send kill checkpoint trans info, transId:%d", (*pTask)->id.idStr, (*pTask)->chkInfo.transId);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1130,7 +1151,7 @@ static int32_t metaHeartbeatToMnodeImpl(SStreamMeta* pMeta) {
|
||||||
entry.processedVer = (*pTask)->chkInfo.processedVer;
|
entry.processedVer = (*pTask)->chkInfo.processedVer;
|
||||||
}
|
}
|
||||||
|
|
||||||
walReaderValidVersionRange((*pTask)->exec.pWalReader, &entry.verStart, &entry.verEnd);
|
walReaderValidVersionRange((*pTask)->exec.pWalReader, &entry.verRange.minVer, &entry.verRange.maxVer);
|
||||||
}
|
}
|
||||||
|
|
||||||
addUpdateNodeIntoHbMsg(*pTask, &hbMsg);
|
addUpdateNodeIntoHbMsg(*pTask, &hbMsg);
|
||||||
|
|
|
@ -44,7 +44,7 @@ static void tryLaunchHistoryTask(void* param, void* tmrId);
|
||||||
static void doProcessDownstreamReadyRsp(SStreamTask* pTask);
|
static void doProcessDownstreamReadyRsp(SStreamTask* pTask);
|
||||||
|
|
||||||
int32_t streamTaskSetReady(SStreamTask* pTask) {
|
int32_t streamTaskSetReady(SStreamTask* pTask) {
|
||||||
int32_t numOfDowns = streamTaskGetNumOfDownstream(pTask);
|
int32_t numOfDowns = streamTaskGetNumOfDownstream(pTask);
|
||||||
SStreamTaskState* p = streamTaskGetStatus(pTask);
|
SStreamTaskState* p = streamTaskGetStatus(pTask);
|
||||||
|
|
||||||
if ((p->state == TASK_STATUS__SCAN_HISTORY) && pTask->info.taskLevel != TASK_LEVEL__SOURCE) {
|
if ((p->state == TASK_STATUS__SCAN_HISTORY) && pTask->info.taskLevel != TASK_LEVEL__SOURCE) {
|
||||||
|
@ -868,8 +868,10 @@ bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t nextProcessVe
|
||||||
} else {
|
} else {
|
||||||
// 2. do secondary scan of the history data, the time window remain, and the version range is updated to
|
// 2. do secondary scan of the history data, the time window remain, and the version range is updated to
|
||||||
// [pTask->dataRange.range.maxVer, ver1]
|
// [pTask->dataRange.range.maxVer, ver1]
|
||||||
pRange->minVer = walScanStartVer;
|
pTask->step2Range.minVer = walScanStartVer;
|
||||||
pRange->maxVer = nextProcessVer - 1;
|
pTask->step2Range.maxVer = nextProcessVer - 1;
|
||||||
|
stDebug("s-task:%s set step2 verRange:%" PRId64 "-%" PRId64 ", step1 verRange:%" PRId64 "-%" PRId64, pTask->id.idStr,
|
||||||
|
pTask->step2Range.minVer, pTask->step2Range.maxVer, pRange->minVer, pRange->maxVer);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -849,13 +849,15 @@ void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc)
|
||||||
pDst->inputQUsed = pSrc->inputQUsed;
|
pDst->inputQUsed = pSrc->inputQUsed;
|
||||||
pDst->inputRate = pSrc->inputRate;
|
pDst->inputRate = pSrc->inputRate;
|
||||||
pDst->processedVer = pSrc->processedVer;
|
pDst->processedVer = pSrc->processedVer;
|
||||||
pDst->verStart = pSrc->verStart;
|
pDst->verRange = pSrc->verRange;
|
||||||
pDst->verEnd = pSrc->verEnd;
|
|
||||||
pDst->sinkQuota = pSrc->sinkQuota;
|
pDst->sinkQuota = pSrc->sinkQuota;
|
||||||
pDst->sinkDataSize = pSrc->sinkDataSize;
|
pDst->sinkDataSize = pSrc->sinkDataSize;
|
||||||
pDst->checkpointId = pSrc->checkpointId;
|
pDst->checkpointInfo = pSrc->checkpointInfo;
|
||||||
pDst->checkpointFailed = pSrc->checkpointFailed;
|
pDst->startCheckpointId = pSrc->startCheckpointId;
|
||||||
pDst->chkpointTransId = pSrc->chkpointTransId;
|
pDst->startCheckpointVer = pSrc->startCheckpointVer;
|
||||||
|
|
||||||
|
pDst->startTime = pSrc->startTime;
|
||||||
|
pDst->hTaskId = pSrc->hTaskId;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t taskPauseCallback(SStreamTask* pTask, void* param) {
|
static int32_t taskPauseCallback(SStreamTask* pTask, void* param) {
|
||||||
|
|
|
@ -47,6 +47,7 @@
|
||||||
|
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/pk_error.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/pk_error.py
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/pk_func.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/pk_func.py
|
||||||
|
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/pk_varchar.py
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/pk_func_group.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/pk_func_group.py
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/partition_expr.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/partition_expr.py
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/project_group.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/project_group.py
|
||||||
|
|
|
@ -221,7 +221,7 @@ class TDTestCase:
|
||||||
tdSql.checkEqual(20470,len(tdSql.queryResult))
|
tdSql.checkEqual(20470,len(tdSql.queryResult))
|
||||||
|
|
||||||
tdSql.query("select * from information_schema.ins_columns where db_name ='information_schema'")
|
tdSql.query("select * from information_schema.ins_columns where db_name ='information_schema'")
|
||||||
tdSql.checkEqual(True, len(tdSql.queryResult) in range(215, 230))
|
tdSql.checkEqual(True, len(tdSql.queryResult) in range(226, 241))
|
||||||
|
|
||||||
tdSql.query("select * from information_schema.ins_columns where db_name ='performance_schema'")
|
tdSql.query("select * from information_schema.ins_columns where db_name ='performance_schema'")
|
||||||
tdSql.checkEqual(54, len(tdSql.queryResult))
|
tdSql.checkEqual(54, len(tdSql.queryResult))
|
||||||
|
|
|
@ -147,11 +147,11 @@ class TDTestCase:
|
||||||
tdSql.execute(f'create stream {stream_name} trigger at_once ignore expired 0 into stb as select * from {self.dbname}.{stbname} partition by tbname')
|
tdSql.execute(f'create stream {stream_name} trigger at_once ignore expired 0 into stb as select * from {self.dbname}.{stbname} partition by tbname')
|
||||||
tdSql.query(f'select * from information_schema.ins_streams where stream_name = "{stream_name}"')
|
tdSql.query(f'select * from information_schema.ins_streams where stream_name = "{stream_name}"')
|
||||||
print(tdSql.queryResult)
|
print(tdSql.queryResult)
|
||||||
tdSql.checkEqual(tdSql.queryResult[0][2],f'create stream {stream_name} trigger at_once ignore expired 0 into stb as select * from {self.dbname}.{stbname} partition by tbname')
|
tdSql.checkEqual(tdSql.queryResult[0][4],f'create stream {stream_name} trigger at_once ignore expired 0 into stb as select * from {self.dbname}.{stbname} partition by tbname')
|
||||||
tdSql.execute(f'drop stream {stream_name}')
|
tdSql.execute(f'drop stream {stream_name}')
|
||||||
tdSql.execute(f'create stream {stream_name} trigger at_once ignore expired 0 into stb1 as select * from tb')
|
tdSql.execute(f'create stream {stream_name} trigger at_once ignore expired 0 into stb1 as select * from tb')
|
||||||
tdSql.query(f'select * from information_schema.ins_streams where stream_name = "{stream_name}"')
|
tdSql.query(f'select * from information_schema.ins_streams where stream_name = "{stream_name}"')
|
||||||
tdSql.checkEqual(tdSql.queryResult[0][2],f'create stream {stream_name} trigger at_once ignore expired 0 into stb1 as select * from tb')
|
tdSql.checkEqual(tdSql.queryResult[0][4],f'create stream {stream_name} trigger at_once ignore expired 0 into stb1 as select * from tb')
|
||||||
tdSql.execute(f'drop database {self.dbname}')
|
tdSql.execute(f'drop database {self.dbname}')
|
||||||
def run(self):
|
def run(self):
|
||||||
self.drop_ntb_check()
|
self.drop_ntb_check()
|
||||||
|
|
|
@ -643,6 +643,11 @@ class TDTestCase:
|
||||||
tdSql.checkData(7, 1, 8)
|
tdSql.checkData(7, 1, 8)
|
||||||
tdSql.checkData(7, 2, 8)
|
tdSql.checkData(7, 2, 8)
|
||||||
|
|
||||||
|
tdSql.query('select ts, last(pk) from d1.st order by pk')
|
||||||
|
tdSql.checkRows(1)
|
||||||
|
tdSql.checkData(0, 0, datetime.datetime(2021, 4, 19, 0, 0, 2))
|
||||||
|
tdSql.checkData(0, 1, 8)
|
||||||
|
|
||||||
tdSql.execute('drop database pk_func')
|
tdSql.execute('drop database pk_func')
|
||||||
def stop(self):
|
def stop(self):
|
||||||
tdSql.close()
|
tdSql.close()
|
||||||
|
|
|
@ -0,0 +1,306 @@
|
||||||
|
import sys
|
||||||
|
from util.log import *
|
||||||
|
from util.cases import *
|
||||||
|
from util.sql import *
|
||||||
|
from util.dnodes import tdDnodes
|
||||||
|
from math import inf
|
||||||
|
|
||||||
|
class TDTestCase:
|
||||||
|
def caseDescription(self):
|
||||||
|
'''
|
||||||
|
case1<shenglian zhou>: [TD-]
|
||||||
|
'''
|
||||||
|
return
|
||||||
|
|
||||||
|
def init(self, conn, logSql, replicaVer=1):
|
||||||
|
tdLog.debug("start to execute %s" % __file__)
|
||||||
|
tdSql.init(conn.cursor(), True)
|
||||||
|
self.conn = conn
|
||||||
|
|
||||||
|
def restartTaosd(self, index=1, dbname="db"):
|
||||||
|
tdDnodes.stop(index)
|
||||||
|
tdDnodes.startWithoutSleep(index)
|
||||||
|
tdSql.execute(f"use pk_varchar")
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
print("running {}".format(__file__))
|
||||||
|
tdSql.execute("drop database if exists pk_varchar")
|
||||||
|
tdSql.execute("create database if not exists pk_varchar")
|
||||||
|
tdSql.execute('use pk_varchar')
|
||||||
|
tdSql.execute('drop database IF EXISTS d1;')
|
||||||
|
|
||||||
|
tdSql.execute('drop database IF EXISTS d2;')
|
||||||
|
|
||||||
|
tdSql.execute('create database d1 vgroups 1')
|
||||||
|
|
||||||
|
tdSql.execute('use d1;')
|
||||||
|
|
||||||
|
tdSql.execute('create table st(ts timestamp, pk varchar(256) primary key, f int) tags(t int);')
|
||||||
|
|
||||||
|
tdSql.execute("insert into ct1 using st tags(1) values('2021-04-19 00:00:00', '1', 1);")
|
||||||
|
|
||||||
|
tdSql.execute("insert into ct1 using st tags(1) values('2021-04-19 00:00:00', '2', 2);")
|
||||||
|
|
||||||
|
tdSql.execute("insert into ct2 using st tags(2) values('2021-04-19 00:00:00', '3', 3);")
|
||||||
|
|
||||||
|
tdSql.execute("insert into ct2 using st tags(2) values('2021-04-19 00:00:00', '4', 4);")
|
||||||
|
|
||||||
|
tdSql.execute("insert into ct1 using st tags(1) values('2021-04-19 00:00:01', '1', 1);")
|
||||||
|
|
||||||
|
tdSql.execute("insert into ct1 using st tags(1) values('2021-04-19 00:00:01', '4', 4);")
|
||||||
|
|
||||||
|
tdSql.execute("insert into ct2 using st tags(2) values('2021-04-19 00:00:01', '3', 3);")
|
||||||
|
|
||||||
|
tdSql.execute("insert into ct2 using st tags(2) values('2021-04-19 00:00:01', '2', 2);")
|
||||||
|
|
||||||
|
tdSql.execute("insert into ct1 using st tags(1) values('2021-04-19 00:00:02', '6', 6);")
|
||||||
|
|
||||||
|
tdSql.execute("insert into ct1 using st tags(1) values('2021-04-19 00:00:02', '5', 5);")
|
||||||
|
|
||||||
|
tdSql.execute("insert into ct2 using st tags(2) values('2021-04-19 00:00:02', '8', 8);")
|
||||||
|
|
||||||
|
tdSql.execute("insert into ct2 using st tags(2) values('2021-04-19 00:00:02', '7', 7);")
|
||||||
|
|
||||||
|
tdSql.query('select first(*) from d1.st partition by tbname order by tbname;')
|
||||||
|
tdSql.checkRows(2)
|
||||||
|
tdSql.checkData(0, 0, datetime.datetime(2021, 4, 19, 0, 0))
|
||||||
|
tdSql.checkData(0, 1, '1')
|
||||||
|
tdSql.checkData(0, 2, 1)
|
||||||
|
tdSql.checkData(1, 0, datetime.datetime(2021, 4, 19, 0, 0))
|
||||||
|
tdSql.checkData(1, 1, '3')
|
||||||
|
tdSql.checkData(1, 2, 3)
|
||||||
|
|
||||||
|
tdSql.query('select last(*) from d1.st partition by tbname order by tbname;')
|
||||||
|
tdSql.checkRows(2)
|
||||||
|
tdSql.checkData(0, 0, datetime.datetime(2021, 4, 19, 0, 0, 2))
|
||||||
|
tdSql.checkData(0, 1, '6')
|
||||||
|
tdSql.checkData(0, 2, 6)
|
||||||
|
tdSql.checkData(1, 0, datetime.datetime(2021, 4, 19, 0, 0, 2))
|
||||||
|
tdSql.checkData(1, 1, '8')
|
||||||
|
tdSql.checkData(1, 2, 8)
|
||||||
|
|
||||||
|
tdSql.query('select last_row(*) from d1.st partition by tbname order by tbname;')
|
||||||
|
tdSql.checkRows(2)
|
||||||
|
tdSql.checkData(0, 0, datetime.datetime(2021, 4, 19, 0, 0, 2))
|
||||||
|
tdSql.checkData(0, 1, '6')
|
||||||
|
tdSql.checkData(0, 2, 6)
|
||||||
|
tdSql.checkData(1, 0, datetime.datetime(2021, 4, 19, 0, 0, 2))
|
||||||
|
tdSql.checkData(1, 1, '8')
|
||||||
|
tdSql.checkData(1, 2, 8)
|
||||||
|
|
||||||
|
tdSql.query('select ts,diff(f) from d1.st partition by tbname order by tbname;')
|
||||||
|
tdSql.checkRows(4)
|
||||||
|
tdSql.checkData(0, 0, datetime.datetime(2021, 4, 19, 0, 0, 1))
|
||||||
|
tdSql.checkData(0, 1, 0)
|
||||||
|
tdSql.checkData(1, 0, datetime.datetime(2021, 4, 19, 0, 0, 2))
|
||||||
|
tdSql.checkData(1, 1, 4)
|
||||||
|
tdSql.checkData(2, 0, datetime.datetime(2021, 4, 19, 0, 0, 1))
|
||||||
|
tdSql.checkData(2, 1, -1)
|
||||||
|
tdSql.checkData(3, 0, datetime.datetime(2021, 4, 19, 0, 0, 2))
|
||||||
|
tdSql.checkData(3, 1, 5)
|
||||||
|
|
||||||
|
tdSql.query('select irate(f) from d1.st partition by tbname order by tbname;')
|
||||||
|
tdSql.checkRows(2)
|
||||||
|
tdSql.checkData(0, 0, 4.0)
|
||||||
|
tdSql.checkData(1, 0, 5.0)
|
||||||
|
|
||||||
|
tdSql.query('select ts,derivative(f, 1s, 0) from d1.st partition by tbname order by tbname;')
|
||||||
|
tdSql.checkRows(4)
|
||||||
|
tdSql.checkData(0, 0, datetime.datetime(2021, 4, 19, 0, 0, 1))
|
||||||
|
tdSql.checkData(0, 1, 0.0)
|
||||||
|
tdSql.checkData(1, 0, datetime.datetime(2021, 4, 19, 0, 0, 2))
|
||||||
|
tdSql.checkData(1, 1, 4.0)
|
||||||
|
tdSql.checkData(2, 0, datetime.datetime(2021, 4, 19, 0, 0, 1))
|
||||||
|
tdSql.checkData(2, 1, -1.0)
|
||||||
|
tdSql.checkData(3, 0, datetime.datetime(2021, 4, 19, 0, 0, 2))
|
||||||
|
tdSql.checkData(3, 1, 5.0)
|
||||||
|
|
||||||
|
tdSql.query('select twa(f) from d1.st partition by tbname order by tbname;')
|
||||||
|
tdSql.checkRows(2)
|
||||||
|
tdSql.checkData(0, 0, 2.0)
|
||||||
|
tdSql.checkData(1, 0, 3.5)
|
||||||
|
|
||||||
|
tdSql.query('select ts,pk,unique(f) from d1.st partition by tbname order by tbname,ts,pk;')
|
||||||
|
tdSql.checkRows(10)
|
||||||
|
tdSql.checkData(0, 0, datetime.datetime(2021, 4, 19, 0, 0))
|
||||||
|
tdSql.checkData(0, 1, '1')
|
||||||
|
tdSql.checkData(0, 2, 1)
|
||||||
|
tdSql.checkData(1, 0, datetime.datetime(2021, 4, 19, 0, 0))
|
||||||
|
tdSql.checkData(1, 1, '2')
|
||||||
|
tdSql.checkData(1, 2, 2)
|
||||||
|
tdSql.checkData(2, 0, datetime.datetime(2021, 4, 19, 0, 0, 1))
|
||||||
|
tdSql.checkData(2, 1, '4')
|
||||||
|
tdSql.checkData(2, 2, 4)
|
||||||
|
tdSql.checkData(3, 0, datetime.datetime(2021, 4, 19, 0, 0, 2))
|
||||||
|
tdSql.checkData(3, 1, '5')
|
||||||
|
tdSql.checkData(3, 2, 5)
|
||||||
|
tdSql.checkData(4, 0, datetime.datetime(2021, 4, 19, 0, 0, 2))
|
||||||
|
tdSql.checkData(4, 1, '6')
|
||||||
|
tdSql.checkData(4, 2, 6)
|
||||||
|
tdSql.checkData(5, 0, datetime.datetime(2021, 4, 19, 0, 0))
|
||||||
|
tdSql.checkData(5, 1, '3')
|
||||||
|
tdSql.checkData(5, 2, 3)
|
||||||
|
tdSql.checkData(6, 0, datetime.datetime(2021, 4, 19, 0, 0))
|
||||||
|
tdSql.checkData(6, 1, '4')
|
||||||
|
tdSql.checkData(6, 2, 4)
|
||||||
|
tdSql.checkData(7, 0, datetime.datetime(2021, 4, 19, 0, 0, 1))
|
||||||
|
tdSql.checkData(7, 1, '2')
|
||||||
|
tdSql.checkData(7, 2, 2)
|
||||||
|
tdSql.checkData(8, 0, datetime.datetime(2021, 4, 19, 0, 0, 2))
|
||||||
|
tdSql.checkData(8, 1, '7')
|
||||||
|
tdSql.checkData(8, 2, 7)
|
||||||
|
tdSql.checkData(9, 0, datetime.datetime(2021, 4, 19, 0, 0, 2))
|
||||||
|
tdSql.checkData(9, 1, '8')
|
||||||
|
tdSql.checkData(9, 2, 8)
|
||||||
|
|
||||||
|
tdSql.query('select * from d1.st order by ts limit 2;')
|
||||||
|
tdSql.checkRows(2)
|
||||||
|
tdSql.checkData(0, 0, datetime.datetime(2021, 4, 19, 0, 0))
|
||||||
|
tdSql.checkData(0, 1, '1')
|
||||||
|
tdSql.checkData(0, 2, 1)
|
||||||
|
tdSql.checkData(0, 3, 1)
|
||||||
|
tdSql.checkData(1, 0, datetime.datetime(2021, 4, 19, 0, 0))
|
||||||
|
tdSql.checkData(1, 1, '2')
|
||||||
|
tdSql.checkData(1, 2, 2)
|
||||||
|
tdSql.checkData(1, 3, 1)
|
||||||
|
|
||||||
|
tdSql.execute('create database d2 vgroups 2')
|
||||||
|
|
||||||
|
tdSql.execute('use d2;')
|
||||||
|
|
||||||
|
tdSql.execute('create table st(ts timestamp, pk varchar(256) primary key, f int) tags(t int);')
|
||||||
|
|
||||||
|
tdSql.execute("insert into ct1 using st tags(1) values('2021-04-19 00:00:00', '1', 1);")
|
||||||
|
|
||||||
|
tdSql.execute("insert into ct1 using st tags(1) values('2021-04-19 00:00:00', '2', 2);")
|
||||||
|
|
||||||
|
tdSql.execute("insert into ct2 using st tags(2) values('2021-04-19 00:00:00', '3', 3);")
|
||||||
|
|
||||||
|
tdSql.execute("insert into ct2 using st tags(2) values('2021-04-19 00:00:00', '4', 4);")
|
||||||
|
|
||||||
|
tdSql.execute("insert into ct1 using st tags(1) values('2021-04-19 00:00:01', '1', 1);")
|
||||||
|
|
||||||
|
tdSql.execute("insert into ct1 using st tags(1) values('2021-04-19 00:00:01', '4', 4);")
|
||||||
|
|
||||||
|
tdSql.execute("insert into ct2 using st tags(2) values('2021-04-19 00:00:01', '3', 3);")
|
||||||
|
|
||||||
|
tdSql.execute("insert into ct2 using st tags(2) values('2021-04-19 00:00:01', '2', 2);")
|
||||||
|
|
||||||
|
tdSql.execute("insert into ct1 using st tags(1) values('2021-04-19 00:00:02', '6', 6);")
|
||||||
|
|
||||||
|
tdSql.execute("insert into ct1 using st tags(1) values('2021-04-19 00:00:02', '5', 5);")
|
||||||
|
|
||||||
|
tdSql.execute("insert into ct2 using st tags(2) values('2021-04-19 00:00:02', '8', 8);")
|
||||||
|
|
||||||
|
tdSql.execute("insert into ct2 using st tags(2) values('2021-04-19 00:00:02', '7', 7);")
|
||||||
|
|
||||||
|
tdSql.query('select first(*) from d2.st partition by tbname order by tbname;')
|
||||||
|
tdSql.checkRows(2)
|
||||||
|
tdSql.checkData(0, 0, datetime.datetime(2021, 4, 19, 0, 0))
|
||||||
|
tdSql.checkData(0, 1, '1')
|
||||||
|
tdSql.checkData(0, 2, 1)
|
||||||
|
tdSql.checkData(1, 0, datetime.datetime(2021, 4, 19, 0, 0))
|
||||||
|
tdSql.checkData(1, 1, '3')
|
||||||
|
tdSql.checkData(1, 2, 3)
|
||||||
|
|
||||||
|
tdSql.query('select last(*) from d2.st partition by tbname order by tbname;')
|
||||||
|
tdSql.checkRows(2)
|
||||||
|
tdSql.checkData(0, 0, datetime.datetime(2021, 4, 19, 0, 0, 2))
|
||||||
|
tdSql.checkData(0, 1, '6')
|
||||||
|
tdSql.checkData(0, 2, 6)
|
||||||
|
tdSql.checkData(1, 0, datetime.datetime(2021, 4, 19, 0, 0, 2))
|
||||||
|
tdSql.checkData(1, 1, '8')
|
||||||
|
tdSql.checkData(1, 2, 8)
|
||||||
|
|
||||||
|
tdSql.query('select last_row(*) from d2.st partition by tbname order by tbname;')
|
||||||
|
tdSql.checkRows(2)
|
||||||
|
tdSql.checkData(0, 0, datetime.datetime(2021, 4, 19, 0, 0, 2))
|
||||||
|
tdSql.checkData(0, 1, '6')
|
||||||
|
tdSql.checkData(0, 2, 6)
|
||||||
|
tdSql.checkData(1, 0, datetime.datetime(2021, 4, 19, 0, 0, 2))
|
||||||
|
tdSql.checkData(1, 1, '8')
|
||||||
|
tdSql.checkData(1, 2, 8)
|
||||||
|
|
||||||
|
tdSql.query('select ts,diff(f) from d2.st partition by tbname order by tbname;')
|
||||||
|
tdSql.checkRows(4)
|
||||||
|
tdSql.checkData(0, 0, datetime.datetime(2021, 4, 19, 0, 0, 1))
|
||||||
|
tdSql.checkData(0, 1, 0)
|
||||||
|
tdSql.checkData(1, 0, datetime.datetime(2021, 4, 19, 0, 0, 2))
|
||||||
|
tdSql.checkData(1, 1, 4)
|
||||||
|
tdSql.checkData(2, 0, datetime.datetime(2021, 4, 19, 0, 0, 1))
|
||||||
|
tdSql.checkData(2, 1, -1)
|
||||||
|
tdSql.checkData(3, 0, datetime.datetime(2021, 4, 19, 0, 0, 2))
|
||||||
|
tdSql.checkData(3, 1, 5)
|
||||||
|
|
||||||
|
tdSql.query('select irate(f) from d2.st partition by tbname order by tbname;')
|
||||||
|
tdSql.checkRows(2)
|
||||||
|
tdSql.checkData(0, 0, 4.0)
|
||||||
|
tdSql.checkData(1, 0, 5.0)
|
||||||
|
|
||||||
|
tdSql.query('select ts,derivative(f, 1s, 0) from d2.st partition by tbname order by tbname;')
|
||||||
|
tdSql.checkRows(4)
|
||||||
|
tdSql.checkData(0, 0, datetime.datetime(2021, 4, 19, 0, 0, 1))
|
||||||
|
tdSql.checkData(0, 1, 0.0)
|
||||||
|
tdSql.checkData(1, 0, datetime.datetime(2021, 4, 19, 0, 0, 2))
|
||||||
|
tdSql.checkData(1, 1, 4.0)
|
||||||
|
tdSql.checkData(2, 0, datetime.datetime(2021, 4, 19, 0, 0, 1))
|
||||||
|
tdSql.checkData(2, 1, -1.0)
|
||||||
|
tdSql.checkData(3, 0, datetime.datetime(2021, 4, 19, 0, 0, 2))
|
||||||
|
tdSql.checkData(3, 1, 5.0)
|
||||||
|
|
||||||
|
tdSql.query('select twa(f) from d2.st partition by tbname order by tbname;')
|
||||||
|
tdSql.checkRows(2)
|
||||||
|
tdSql.checkData(0, 0, 2.0)
|
||||||
|
tdSql.checkData(1, 0, 3.5)
|
||||||
|
|
||||||
|
tdSql.query('select ts,pk,unique(f) from d2.st partition by tbname order by tbname,ts,pk;')
|
||||||
|
tdSql.checkRows(10)
|
||||||
|
tdSql.checkData(0, 0, datetime.datetime(2021, 4, 19, 0, 0))
|
||||||
|
tdSql.checkData(0, 1, '1')
|
||||||
|
tdSql.checkData(0, 2, 1)
|
||||||
|
tdSql.checkData(1, 0, datetime.datetime(2021, 4, 19, 0, 0))
|
||||||
|
tdSql.checkData(1, 1, '2')
|
||||||
|
tdSql.checkData(1, 2, 2)
|
||||||
|
tdSql.checkData(2, 0, datetime.datetime(2021, 4, 19, 0, 0, 1))
|
||||||
|
tdSql.checkData(2, 1, '4')
|
||||||
|
tdSql.checkData(2, 2, 4)
|
||||||
|
tdSql.checkData(3, 0, datetime.datetime(2021, 4, 19, 0, 0, 2))
|
||||||
|
tdSql.checkData(3, 1, '5')
|
||||||
|
tdSql.checkData(3, 2, 5)
|
||||||
|
tdSql.checkData(4, 0, datetime.datetime(2021, 4, 19, 0, 0, 2))
|
||||||
|
tdSql.checkData(4, 1, '6')
|
||||||
|
tdSql.checkData(4, 2, 6)
|
||||||
|
tdSql.checkData(5, 0, datetime.datetime(2021, 4, 19, 0, 0))
|
||||||
|
tdSql.checkData(5, 1, '3')
|
||||||
|
tdSql.checkData(5, 2, 3)
|
||||||
|
tdSql.checkData(6, 0, datetime.datetime(2021, 4, 19, 0, 0))
|
||||||
|
tdSql.checkData(6, 1, '4')
|
||||||
|
tdSql.checkData(6, 2, 4)
|
||||||
|
tdSql.checkData(7, 0, datetime.datetime(2021, 4, 19, 0, 0, 1))
|
||||||
|
tdSql.checkData(7, 1, '2')
|
||||||
|
tdSql.checkData(7, 2, 2)
|
||||||
|
tdSql.checkData(8, 0, datetime.datetime(2021, 4, 19, 0, 0, 2))
|
||||||
|
tdSql.checkData(8, 1, '7')
|
||||||
|
tdSql.checkData(8, 2, 7)
|
||||||
|
tdSql.checkData(9, 0, datetime.datetime(2021, 4, 19, 0, 0, 2))
|
||||||
|
tdSql.checkData(9, 1, '8')
|
||||||
|
tdSql.checkData(9, 2, 8)
|
||||||
|
|
||||||
|
tdSql.query('select * from d2.st order by ts limit 2;')
|
||||||
|
tdSql.checkRows(2)
|
||||||
|
tdSql.checkData(0, 0, datetime.datetime(2021, 4, 19, 0, 0))
|
||||||
|
tdSql.checkData(0, 1, '1')
|
||||||
|
tdSql.checkData(0, 2, 1)
|
||||||
|
tdSql.checkData(0, 3, 1)
|
||||||
|
tdSql.checkData(1, 0, datetime.datetime(2021, 4, 19, 0, 0))
|
||||||
|
tdSql.checkData(1, 1, '2')
|
||||||
|
tdSql.checkData(1, 2, 2)
|
||||||
|
tdSql.checkData(1, 3, 1)
|
||||||
|
|
||||||
|
tdSql.execute('drop database pk_varchar')
|
||||||
|
def stop(self):
|
||||||
|
tdSql.close()
|
||||||
|
tdLog.success("%s successfully executed" % __file__)
|
||||||
|
|
||||||
|
tdCases.addWindows(__file__, TDTestCase())
|
||||||
|
tdCases.addLinux(__file__, TDTestCase())
|
Loading…
Reference in New Issue