Merge branch '3.0' of https://github.com/taosdata/TDengine into 3.0
This commit is contained in:
commit
369e57f3a6
|
@ -393,7 +393,7 @@ pipeline {
|
|||
agent{label " Mac_catalina "}
|
||||
steps {
|
||||
catchError(buildResult: 'FAILURE', stageResult: 'FAILURE') {
|
||||
timeout(time: 30, unit: 'MINUTES'){
|
||||
timeout(time: 60, unit: 'MINUTES'){
|
||||
pre_test()
|
||||
pre_test_build_mac()
|
||||
}
|
||||
|
|
|
@ -58,6 +58,8 @@ window_clause: {
|
|||
SESSION(ts_col, tol_val)
|
||||
| STATE_WINDOW(col)
|
||||
| INTERVAL(interval_val [, interval_offset]) [SLIDING (sliding_val)] [WATERMARK(watermark_val)] [FILL(fill_mod_and_val)]
|
||||
| EVENT_WINDOW START WITH start_trigger_condition END WITH end_trigger_condition
|
||||
| COUNT_WINDOW(count_val[, sliding_val])
|
||||
|
||||
interp_clause:
|
||||
RANGE(ts_val [, ts_val]) EVERY(every_val) FILL(fill_mod_and_val)
|
||||
|
@ -95,6 +97,7 @@ The list of currently supported Hints is as follows:
|
|||
| PARTITION_FIRST| None | Use Partition before aggregate, conflict with SORT_FOR_GROUP | With normal column in partition by list |
|
||||
| PARA_TABLES_SORT| None | When sorting the supertable rows by timestamp, No temporary disk space is used. When there are numerous tables, each with long rows, the corresponding algorithm associated with this prompt may consume a substantial amount of memory, potentially leading to an Out Of Memory (OOM) situation. | Sorting the supertable rows by timestamp |
|
||||
| SMALLDATA_TS_SORT| None | When sorting the supertable rows by timestamp, if the length of query columns >= 256, and there are relatively few rows, this hint can improve performance. | Sorting the supertable rows by timestamp |
|
||||
| SKIP_TSMA| None| To explicitly disable tsma optimization for select query|Select query with agg funcs|
|
||||
|
||||
For example:
|
||||
|
||||
|
|
|
@ -31,7 +31,7 @@ A PARTITION BY clause is processed as follows:
|
|||
select _wstart, location, max(current) from meters partition by location interval(10m)
|
||||
```
|
||||
|
||||
The most common usage of PARTITION BY is partitioning the data in subtables by tags then perform computation when querying data in a supertable. More specifically, `PARTITION BY TBNAME` partitions the data of each subtable into a single timeline, and this method facilitates the statistical analysis in many use cases of processing timeseries data. For example, calculate the average voltage of each meter every 10 minutes£º
|
||||
The most common usage of PARTITION BY is partitioning the data in subtables by tags then perform computation when querying data in a supertable. More specifically, `PARTITION BY TBNAME` partitions the data of each subtable into a single timeline, and this method facilitates the statistical analysis in many use cases of processing timeseries data. For example, calculate the average voltage of each meter every 10 minutes£º
|
||||
```sql
|
||||
select _wstart, tbname, avg(voltage) from meters partition by tbname interval(10m)
|
||||
```
|
||||
|
@ -44,12 +44,12 @@ Aggregation by time window is supported in TDengine. For example, in the case wh
|
|||
window_clause: {
|
||||
SESSION(ts_col, tol_val)
|
||||
| STATE_WINDOW(col)
|
||||
| INTERVAL(interval_val [, offset]) [SLIDING (sliding_value)] [FILL({NONE | VALUE | PREV | NULL | LINEAR | NEXT})]
|
||||
| INTERVAL(interval_val [, interval_offset]) [SLIDING (sliding_value)] [FILL({NONE | VALUE | PREV | NULL | LINEAR | NEXT})]
|
||||
| EVENT_WINDOW START WITH start_trigger_condition END WITH end_trigger_condition
|
||||
}
|
||||
```
|
||||
|
||||
Both interval_val and sliding_value are time durations which have 3 forms of representation.
|
||||
Both interval_val and sliding_value are time durations, and interval_offset is the window offset, interval_offset must be less than interval_val, There are 3 forms of representation.
|
||||
- INTERVAL(1s, 500a) SLIDING(1s), the unit char should be any one of a (millisecond), b (nanosecond), d (day), h (hour), m (minute), n (month), s (second), u (microsecond), w (week), y (year).
|
||||
- INTERVAL(1000, 500) SLIDING(1000), the unit will the same as the queried database, if there are more than one databases, higher precision will be used.
|
||||
- INTERVAL('1s', '500a') SLIDING('1s'), unit must be specified, no spaces allowed.
|
||||
|
|
|
@ -58,6 +58,8 @@ window_clause: {
|
|||
SESSION(ts_col, tol_val)
|
||||
| STATE_WINDOW(col)
|
||||
| INTERVAL(interval_val [, interval_offset]) [SLIDING (sliding_val)] [WATERMARK(watermark_val)] [FILL(fill_mod_and_val)]
|
||||
| EVENT_WINDOW START WITH start_trigger_condition END WITH end_trigger_condition
|
||||
| COUNT_WINDOW(count_val[, sliding_val])
|
||||
|
||||
interp_clause:
|
||||
RANGE(ts_val [, ts_val]) EVERY(every_val) FILL(fill_mod_and_val)
|
||||
|
@ -95,6 +97,7 @@ Hints 是用户控制单个语句查询优化的一种手段,当 Hint 不适
|
|||
| PARTITION_FIRST| 无 | 在聚合之前使用PARTITION计算分组, 与SORT_FOR_GROUP冲突 | partition by 列表有普通列时 |
|
||||
| PARA_TABLES_SORT| 无 | 超级表的数据按时间戳排序时, 不使用临时磁盘空间, 只使用内存。当子表数量多, 行长比较大时候, 会使用大量内存, 可能发生OOM | 超级表的数据按时间戳排序时 |
|
||||
| SMALLDATA_TS_SORT| 无 | 超级表的数据按时间戳排序时, 查询列长度大于等于256, 但是行数不多, 使用这个提示, 可以提高性能 | 超级表的数据按时间戳排序时 |
|
||||
| SKIP_TSMA | 无 | 用于显示的禁用TSMA查询优化 | 带Agg函数的查询语句 |
|
||||
|
||||
举例:
|
||||
|
||||
|
|
|
@ -53,7 +53,7 @@ window_clause: {
|
|||
}
|
||||
```
|
||||
|
||||
其中,interval_val 和 sliding_val 都表示时间段, 语法上支持三种方式,举例说明如下:
|
||||
其中,interval_val 和 sliding_val 都表示时间段,interval_offset 表示窗口偏移量,interval_offset 必须小于 interval_val,语法上支持三种方式,举例说明如下:
|
||||
- INTERVAL(1s, 500a) SLIDING(1s), 自带时间单位的形式,其中的时间单位是单字符表示, 分别为: a (毫秒), b (纳秒), d (天), h (小时), m (分钟), n (月), s (秒), u (微妙), w (周), y (年).
|
||||
- INTERVAL(1000, 500) SLIDING(1000), 不带时间单位的形式,将使用查询库的时间精度作为默认时间单位,当存在多个库时默认采用精度更高的库.
|
||||
- INTERVAL('1s', '500a') SLIDING('1s'), 自带时间单位的字符串形式,字符串内部不能有任何空格等其它字符.
|
||||
|
|
|
@ -896,7 +896,7 @@ void streamMetaWUnLock(SStreamMeta* pMeta);
|
|||
void streamMetaResetStartInfo(STaskStartInfo* pMeta);
|
||||
SArray* streamMetaSendMsgBeforeCloseTasks(SStreamMeta* pMeta);
|
||||
void streamMetaUpdateStageRole(SStreamMeta* pMeta, int64_t stage, bool isLeader);
|
||||
int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta);
|
||||
void streamMetaLoadAllTasks(SStreamMeta* pMeta);
|
||||
int32_t streamMetaStartAllTasks(SStreamMeta* pMeta);
|
||||
int32_t streamMetaStopAllTasks(SStreamMeta* pMeta);
|
||||
int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId);
|
||||
|
|
|
@ -173,6 +173,7 @@ static const SSysDbTableSchema streamSchema[] = {
|
|||
{.name = "watermark", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false},
|
||||
{.name = "trigger", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
||||
{.name = "sink_quota", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
||||
{.name = "checkpoint_interval", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
||||
{.name = "checkpoint_backup", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
||||
{.name = "history_scan_idle", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
||||
};
|
||||
|
|
|
@ -1213,27 +1213,31 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
|
|||
|
||||
SMDropStreamReq dropReq = {0};
|
||||
if (tDeserializeSMDropStreamReq(pReq->pCont, pReq->contLen, &dropReq) < 0) {
|
||||
mError("invalid drop stream msg recv, discarded");
|
||||
terrno = TSDB_CODE_INVALID_MSG;
|
||||
return -1;
|
||||
}
|
||||
|
||||
pStream = mndAcquireStream(pMnode, dropReq.name);
|
||||
mDebug("recv drop stream:%s msg", dropReq.name);
|
||||
|
||||
pStream = mndAcquireStream(pMnode, dropReq.name);
|
||||
if (pStream == NULL) {
|
||||
if (dropReq.igNotExists) {
|
||||
mInfo("stream:%s not exist, ignore not exist is set", dropReq.name);
|
||||
mInfo("stream:%s not exist, ignore not exist is set, drop stream exec done with success", dropReq.name);
|
||||
sdbRelease(pMnode->pSdb, pStream);
|
||||
tFreeMDropStreamReq(&dropReq);
|
||||
return 0;
|
||||
} else {
|
||||
terrno = TSDB_CODE_MND_STREAM_NOT_EXIST;
|
||||
mError("stream:%s not exist failed to drop", dropReq.name);
|
||||
mError("stream:%s not exist failed to drop it", dropReq.name);
|
||||
tFreeMDropStreamReq(&dropReq);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
if (pStream->smaId != 0) {
|
||||
mDebug("stream:%s, uid:0x%"PRIx64" try to drop sma related stream", dropReq.name, pStream->uid);
|
||||
|
||||
void *pIter = NULL;
|
||||
SSmaObj *pSma = NULL;
|
||||
pIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter, (void**)&pSma);
|
||||
|
@ -1241,13 +1245,21 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
|
|||
if (pSma && pSma->uid == pStream->smaId) {
|
||||
sdbRelease(pMnode->pSdb, pSma);
|
||||
sdbRelease(pMnode->pSdb, pStream);
|
||||
|
||||
sdbCancelFetch(pMnode->pSdb, pIter);
|
||||
tFreeMDropStreamReq(&dropReq);
|
||||
terrno = TSDB_CODE_TSMA_MUST_BE_DROPPED;
|
||||
|
||||
mError("try to drop sma-related stream:%s, uid:0x%" PRIx64 " code:%s only allowed to be dropped along with sma",
|
||||
dropReq.name, pStream->uid, tstrerror(terrno));
|
||||
return -1;
|
||||
}
|
||||
if (pSma) sdbRelease(pMnode->pSdb, pSma);
|
||||
pIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter, (void**)&pSma);
|
||||
|
||||
if (pSma) {
|
||||
sdbRelease(pMnode->pSdb, pSma);
|
||||
}
|
||||
|
||||
pIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter, (void **)&pSma);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1267,7 +1279,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
|
|||
|
||||
STrans *pTrans = doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_DROP_NAME, "drop stream");
|
||||
if (pTrans == NULL) {
|
||||
mError("stream:%s, failed to drop since %s", dropReq.name, terrstr());
|
||||
mError("stream:%s uid:0x%"PRIx64" failed to drop since %s", dropReq.name, pStream->uid, terrstr());
|
||||
sdbRelease(pMnode->pSdb, pStream);
|
||||
tFreeMDropStreamReq(&dropReq);
|
||||
return -1;
|
||||
|
@ -1277,7 +1289,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
|
|||
|
||||
// drop all tasks
|
||||
if (mndStreamSetDropAction(pMnode, pTrans, pStream) < 0) {
|
||||
mError("stream:%s, failed to drop task since %s", dropReq.name, terrstr());
|
||||
mError("stream:%s uid:0x%" PRIx64 " failed to drop task since %s", dropReq.name, pStream->uid, terrstr());
|
||||
sdbRelease(pMnode->pSdb, pStream);
|
||||
mndTransDrop(pTrans);
|
||||
tFreeMDropStreamReq(&dropReq);
|
||||
|
@ -1303,10 +1315,13 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
|
|||
// kill the related checkpoint trans
|
||||
int32_t transId = mndStreamGetRelTrans(pMnode, pStream->uid);
|
||||
if (transId != 0) {
|
||||
mDebug("drop active related transId:%d due to stream:%s dropped", transId, pStream->name);
|
||||
mDebug("drop active transId:%d due to stream:%s uid:0x%" PRIx64 " dropped", transId, pStream->name, pStream->uid);
|
||||
mndKillTransImpl(pMnode, transId, pStream->sourceDb);
|
||||
}
|
||||
|
||||
mDebug("stream:%s uid:0x%" PRIx64 " transId:%d start to drop related task when dropping stream", dropReq.name,
|
||||
pStream->uid, transId);
|
||||
|
||||
removeStreamTasksInBuf(pStream, &execInfo);
|
||||
|
||||
SName name = {0};
|
||||
|
@ -1488,6 +1503,7 @@ static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
|
|||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataSetVal(pColInfo, numOfRows, (const char *)&trigger, false);
|
||||
|
||||
// sink_quota
|
||||
char sinkQuota[20 + VARSTR_HEADER_SIZE] = {0};
|
||||
sinkQuota[0] = '0';
|
||||
char dstStr[20] = {0};
|
||||
|
@ -1495,6 +1511,14 @@ static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
|
|||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataSetVal(pColInfo, numOfRows, (const char *)dstStr, false);
|
||||
|
||||
// checkpoint interval
|
||||
char tmp[20 + VARSTR_HEADER_SIZE] = {0};
|
||||
sprintf(varDataVal(tmp), "%d sec", tsStreamCheckpointInterval);
|
||||
varDataSetLen(tmp, strlen(varDataVal(tmp)));
|
||||
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataSetVal(pColInfo, numOfRows, (const char *)tmp, false);
|
||||
|
||||
// checkpoint backup type
|
||||
char backup[20 + VARSTR_HEADER_SIZE] = {0};
|
||||
STR_TO_VARSTR(backup, "none")
|
||||
|
|
|
@ -907,21 +907,34 @@ END:
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t sendDeleteSubToVnode(SMqSubscribeObj *pSub, STrans *pTrans){
|
||||
static int32_t sendDeleteSubToVnode(SMnode *pMnode, SMqSubscribeObj *pSub, STrans *pTrans){
|
||||
// iter all vnode to delete handle
|
||||
int32_t sz = taosArrayGetSize(pSub->unassignedVgs);
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
SMqVgEp *pVgEp = taosArrayGetP(pSub->unassignedVgs, i);
|
||||
SMqVDeleteReq *pReq = taosMemoryCalloc(1, sizeof(SMqVDeleteReq));
|
||||
if(pReq == NULL){
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
pReq->head.vgId = htonl(pVgEp->vgId);
|
||||
pReq->vgId = pVgEp->vgId;
|
||||
pReq->consumerId = -1;
|
||||
memcpy(pReq->subKey, pSub->key, TSDB_SUBSCRIBE_KEY_LEN);
|
||||
|
||||
SVgObj *pVgObj = mndAcquireVgroup(pMnode, pVgEp->vgId);
|
||||
if (pVgObj == NULL) {
|
||||
taosMemoryFree(pReq);
|
||||
terrno = TSDB_CODE_MND_VGROUP_NOT_EXIST;
|
||||
return -1;
|
||||
}
|
||||
STransAction action = {0};
|
||||
action.epSet = pVgEp->epSet;
|
||||
action.epSet = mndGetVgroupEpset(pMnode, pVgObj);;
|
||||
action.pCont = pReq;
|
||||
action.contLen = sizeof(SMqVDeleteReq);
|
||||
action.msgType = TDMT_VND_TMQ_DELETE_SUB;
|
||||
|
||||
mndReleaseVgroup(pMnode, pVgObj);
|
||||
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
||||
taosMemoryFree(pReq);
|
||||
return -1;
|
||||
|
@ -1002,7 +1015,7 @@ static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) {
|
|||
goto end;
|
||||
}
|
||||
|
||||
code = sendDeleteSubToVnode(pSub, pTrans);
|
||||
code = sendDeleteSubToVnode(pMnode, pSub, pTrans);
|
||||
if (code != 0) {
|
||||
goto end;
|
||||
}
|
||||
|
@ -1263,7 +1276,7 @@ int32_t mndDropSubByTopic(SMnode *pMnode, STrans *pTrans, const char *topicName)
|
|||
goto END;
|
||||
}
|
||||
|
||||
code = sendDeleteSubToVnode(pSub, pTrans);
|
||||
code = sendDeleteSubToVnode(pMnode, pSub, pTrans);
|
||||
if (code != 0) {
|
||||
goto END;
|
||||
}
|
||||
|
|
|
@ -93,9 +93,7 @@ SSnode *sndOpen(const char *path, const SSnodeOpt *pOption) {
|
|||
goto FAIL;
|
||||
}
|
||||
|
||||
if (streamMetaLoadAllTasks(pSnode->pMeta) < 0) {
|
||||
goto FAIL;
|
||||
}
|
||||
streamMetaLoadAllTasks(pSnode->pMeta);
|
||||
|
||||
stopRsync();
|
||||
startRsync();
|
||||
|
|
|
@ -92,7 +92,7 @@ int32_t tqInitialize(STQ* pTq) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
/*int32_t code = */streamMetaLoadAllTasks(pTq->pStreamMeta);
|
||||
streamMetaLoadAllTasks(pTq->pStreamMeta);
|
||||
|
||||
if (tqMetaTransform(pTq) < 0) {
|
||||
return -1;
|
||||
|
|
|
@ -333,8 +333,9 @@ int32_t doMergeExistedRows(SSubmitTbData* pExisted, const SSubmitTbData* pNew, c
|
|||
}
|
||||
|
||||
while (j < newLen && k < oldLen) {
|
||||
SRow* pNewRow = taosArrayGetP(pNew->aRowP, j);
|
||||
SRow* pOldRow = taosArrayGetP(pExisted->aRowP, k);
|
||||
SRow* pNewRow = *(SRow**)TARRAY_GET_ELEM(pNew->aRowP, j);
|
||||
SRow* pOldRow = *(SRow**)TARRAY_GET_ELEM(pExisted->aRowP, k);
|
||||
|
||||
if (pNewRow->ts < pOldRow->ts) {
|
||||
taosArrayPush(pFinal, &pNewRow);
|
||||
j += 1;
|
||||
|
@ -373,12 +374,12 @@ int32_t doMergeExistedRows(SSubmitTbData* pExisted, const SSubmitTbData* pNew, c
|
|||
}
|
||||
|
||||
while (j < newLen) {
|
||||
SRow* pRow = taosArrayGetP(pNew->aRowP, j++);
|
||||
SRow* pRow = *(SRow**)TARRAY_GET_ELEM(pNew->aRowP, j++);
|
||||
taosArrayPush(pFinal, &pRow);
|
||||
}
|
||||
|
||||
while (k < oldLen) {
|
||||
SRow* pRow = taosArrayGetP(pExisted->aRowP, k++);
|
||||
SRow* pRow = *(SRow**)TARRAY_GET_ELEM(pExisted->aRowP, k++);
|
||||
taosArrayPush(pFinal, &pRow);
|
||||
}
|
||||
|
||||
|
|
|
@ -51,7 +51,7 @@ int32_t streamStateSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamS
|
|||
|
||||
SStreamSnapReader* pSnapReader = NULL;
|
||||
|
||||
if (streamSnapReaderOpen(meta, sver, chkpId, pTq->path, &pSnapReader) == 0) {
|
||||
if (streamSnapReaderOpen(meta, sver, chkpId, meta->path, &pSnapReader) == 0) {
|
||||
pReader->complete = 1;
|
||||
} else {
|
||||
code = -1;
|
||||
|
@ -128,7 +128,6 @@ int32_t streamStateSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamS
|
|||
int32_t code = 0;
|
||||
SStreamStateWriter* pWriter;
|
||||
|
||||
char tdir[TSDB_FILENAME_LEN * 2] = {0};
|
||||
// alloc
|
||||
pWriter = (SStreamStateWriter*)taosMemoryCalloc(1, sizeof(*pWriter));
|
||||
if (pWriter == NULL) {
|
||||
|
@ -139,15 +138,14 @@ int32_t streamStateSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamS
|
|||
pWriter->sver = sver;
|
||||
pWriter->ever = ever;
|
||||
|
||||
sprintf(tdir, "%s%s%s", pTq->path, TD_DIRSEP, VNODE_TQ_STREAM);
|
||||
taosMkDir(tdir);
|
||||
taosMkDir(pTq->pStreamMeta->path);
|
||||
|
||||
SStreamSnapWriter* pSnapWriter = NULL;
|
||||
if (streamSnapWriterOpen(pTq, sver, ever, tdir, &pSnapWriter) < 0) {
|
||||
if (streamSnapWriterOpen(pTq, sver, ever, pTq->pStreamMeta->path, &pSnapWriter) < 0) {
|
||||
goto _err;
|
||||
}
|
||||
|
||||
tqDebug("vgId:%d, vnode %s snapshot writer opened, path:%s", TD_VID(pTq->pVnode), STREAM_STATE_TRANSFER, tdir);
|
||||
tqDebug("vgId:%d, vnode %s snapshot writer opened, path:%s", TD_VID(pTq->pVnode), STREAM_STATE_TRANSFER, pTq->pStreamMeta->path);
|
||||
pWriter->pWriterImpl = pSnapWriter;
|
||||
|
||||
*ppWriter = pWriter;
|
||||
|
@ -181,5 +179,6 @@ int32_t streamStateRebuildFromSnap(SStreamStateWriter* pWriter, int64_t chkpId)
|
|||
}
|
||||
|
||||
int32_t streamStateLoadTasks(SStreamStateWriter* pWriter) {
|
||||
return streamMetaLoadAllTasks(pWriter->pTq->pStreamMeta);
|
||||
streamMetaLoadAllTasks(pWriter->pTq->pStreamMeta);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
|
|
@ -27,6 +27,8 @@ typedef struct SMStreamCheckpointReadyRspMsg {
|
|||
SMsgHead head;
|
||||
} SMStreamCheckpointReadyRspMsg;
|
||||
|
||||
static int32_t doProcessDummyRspMsg(SStreamMeta* pMeta, SRpcMsg* pMsg);
|
||||
|
||||
static STaskId replaceStreamTaskId(SStreamTask* pTask) {
|
||||
ASSERT(pTask->info.fillHistory);
|
||||
STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
|
||||
|
@ -490,16 +492,6 @@ int32_t tqStreamTaskProcessCheckReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
|
|||
return streamSendCheckRsp(pMeta, &req, &rsp, &pMsg->info, taskId);
|
||||
}
|
||||
|
||||
static void setParam(SStreamTask* pTask, int64_t* startCheckTs, bool* hasHTask, STaskId* pId) {
|
||||
*startCheckTs = pTask->execInfo.checkTs;
|
||||
|
||||
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
|
||||
*hasHTask = true;
|
||||
pId->streamId = pTask->hTaskInfo.id.streamId;
|
||||
pId->taskId = pTask->hTaskInfo.id.taskId;
|
||||
}
|
||||
}
|
||||
|
||||
int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLeader) {
|
||||
char* pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
||||
int32_t len = pMsg->contLen - sizeof(SMsgHead);
|
||||
|
@ -772,13 +764,7 @@ static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) {
|
|||
int64_t el = taosGetTimestampMs() - st;
|
||||
tqInfo("vgId:%d close&reload state elapsed time:%.3fs", vgId, el / 1000.);
|
||||
|
||||
code = streamMetaLoadAllTasks(pMeta);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tqError("vgId:%d failed to load stream tasks, code:%s", vgId, tstrerror(terrno));
|
||||
streamMetaWUnLock(pMeta);
|
||||
code = terrno;
|
||||
return code;
|
||||
}
|
||||
streamMetaLoadAllTasks(pMeta);
|
||||
|
||||
{
|
||||
STaskStartInfo* pStartInfo = &pMeta->startInfo;
|
||||
|
@ -1053,10 +1039,9 @@ int32_t tqStreamTaskProcessTaskResumeReq(void* handle, int64_t sversion, char* m
|
|||
|
||||
int32_t tqStreamTasksGetTotalNum(SStreamMeta* pMeta) { return taosArrayGetSize(pMeta->pTaskList); }
|
||||
|
||||
static int32_t doProcessDummyRspMsg(SStreamMeta* pMeta, SRpcMsg* pMsg) {
|
||||
int32_t doProcessDummyRspMsg(SStreamMeta* UNUSED_PARAM(pMeta), SRpcMsg* pMsg) {
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
pMsg->pCont = NULL;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
|
|
@ -1539,7 +1539,7 @@ int32_t ctgCloneMetaOutput(STableMetaOutput* output, STableMetaOutput** pOutput)
|
|||
if (output->tbMeta) {
|
||||
int32_t metaSize = CTG_META_SIZE(output->tbMeta);
|
||||
int32_t schemaExtSize = 0;
|
||||
if (useCompress(output->ctbMeta.tableType)) {
|
||||
if (useCompress(output->tbMeta->tableType) && (*pOutput)->tbMeta->schemaExt) {
|
||||
schemaExtSize = output->tbMeta->tableInfo.numOfColumns * sizeof(SSchemaExt);
|
||||
}
|
||||
(*pOutput)->tbMeta = taosMemoryMalloc(metaSize + schemaExtSize);
|
||||
|
@ -1551,7 +1551,7 @@ int32_t ctgCloneMetaOutput(STableMetaOutput* output, STableMetaOutput** pOutput)
|
|||
}
|
||||
|
||||
memcpy((*pOutput)->tbMeta, output->tbMeta, metaSize);
|
||||
if (useCompress(output->ctbMeta.tableType)) {
|
||||
if (useCompress(output->tbMeta->tableType) && (*pOutput)->tbMeta->schemaExt) {
|
||||
(*pOutput)->tbMeta->schemaExt = (SSchemaExt *)((char *)(*pOutput)->tbMeta + metaSize);
|
||||
memcpy((*pOutput)->tbMeta->schemaExt, output->tbMeta->schemaExt, schemaExtSize);
|
||||
} else {
|
||||
|
|
|
@ -151,7 +151,7 @@ static int32_t setDescResultIntoDataBlock(bool sysInfoUser, SSDataBlock* pBlock,
|
|||
STR_TO_VARSTR(buf, "VIEW COL");
|
||||
}
|
||||
colDataSetVal(pCol4, pBlock->info.rows, buf, false);
|
||||
if (useCompress(pMeta->tableType)) {
|
||||
if (useCompress(pMeta->tableType) && pMeta->schemaExt) {
|
||||
if (i < pMeta->tableInfo.numOfColumns) {
|
||||
STR_TO_VARSTR(buf, columnEncodeStr(COMPRESS_L1_TYPE_U32(pMeta->schemaExt[i].compress)));
|
||||
colDataSetVal(pCol5, pBlock->info.rows, buf, false);
|
||||
|
@ -201,7 +201,7 @@ static int32_t execDescribe(bool sysInfoUser, SNode* pStmt, SRetrieveTableRsp**
|
|||
code = setDescResultIntoDataBlock(sysInfoUser, pBlock, numOfRows, pDesc->pMeta, biMode);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
if (pDesc->pMeta && useCompress(pDesc->pMeta->tableType)) {
|
||||
if (pDesc->pMeta && useCompress(pDesc->pMeta->tableType) && pDesc->pMeta->schemaExt) {
|
||||
code = buildRetrieveTableRsp(pBlock, DESCRIBE_RESULT_COLS_COMPRESS, pRsp);
|
||||
} else {
|
||||
code = buildRetrieveTableRsp(pBlock, DESCRIBE_RESULT_COLS, pRsp);
|
||||
|
@ -569,7 +569,7 @@ void appendColumnFields(char* buf, int32_t* len, STableCfg* pCfg) {
|
|||
sprintf(type + strlen(type), "(%d)", (int32_t)((pSchema->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE));
|
||||
}
|
||||
|
||||
if (useCompress(pCfg->tableType)) {
|
||||
if (useCompress(pCfg->tableType) && pCfg->pSchemaExt) {
|
||||
sprintf(type + strlen(type), " ENCODE \'%s\'",
|
||||
columnEncodeStr(COMPRESS_L1_TYPE_U32(pCfg->pSchemaExt[i].compress)));
|
||||
sprintf(type + strlen(type), " COMPRESS \'%s\'",
|
||||
|
|
|
@ -405,6 +405,13 @@ static int32_t collectMetaKeyFromDescribe(SCollectMetaKeyCxt* pCxt, SDescribeStm
|
|||
static int32_t collectMetaKeyFromCreateStream(SCollectMetaKeyCxt* pCxt, SCreateStreamStmt* pStmt) {
|
||||
int32_t code =
|
||||
reserveTableMetaInCache(pCxt->pParseCxt->acctId, pStmt->targetDbName, pStmt->targetTabName, pCxt->pMetaCache);
|
||||
if (TSDB_CODE_SUCCESS == code && NULL != pStmt->pSubtable && NULL != pStmt->pQuery) {
|
||||
SSelectStmt* pSelect = (SSelectStmt*)pStmt->pQuery;
|
||||
pSelect->pSubtable = nodesCloneNode(pStmt->pSubtable);
|
||||
if (NULL == pSelect->pSubtable) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = collectMetaKeyFromQuery(pCxt, pStmt->pQuery);
|
||||
}
|
||||
|
|
|
@ -1054,9 +1054,9 @@ static bool isPrimaryKey(STempTableNode* pTable, SNode* pExpr) {
|
|||
static bool hasPkInTable(const STableMeta* pTableMeta) {
|
||||
bool hasPK = pTableMeta->tableInfo.numOfColumns >= 2 && pTableMeta->schema[1].flags & COL_IS_KEY;
|
||||
if (hasPK) {
|
||||
uInfo("has primary key, %s", pTableMeta->schema[1].name);
|
||||
uDebug("has primary key, %s", pTableMeta->schema[1].name);
|
||||
} else {
|
||||
uInfo("no primary key, %s", pTableMeta->schema[1].name);
|
||||
uDebug("no primary key, %s", pTableMeta->schema[1].name);
|
||||
}
|
||||
return hasPK;
|
||||
}
|
||||
|
@ -9327,10 +9327,13 @@ static int32_t addSubtableNameToCreateStreamQuery(STranslateContext* pCxt, SCrea
|
|||
if (NULL == pStmt->pSubtable) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
pSelect->pSubtable = nodesCloneNode(pStmt->pSubtable);
|
||||
if (NULL == pSelect->pSubtable) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
pSelect->pSubtable = nodesCloneNode(pStmt->pSubtable);
|
||||
if (NULL == pSelect->pSubtable) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
}
|
||||
|
||||
SRewriteSubtableCxt cxt = {.pCxt = pCxt, .pPartitionList = pSelect->pPartitionByList};
|
||||
nodesRewriteExpr(&pSelect->pSubtable, rewriteSubtable, &cxt);
|
||||
return pCxt->errCode;
|
||||
|
|
|
@ -1225,7 +1225,7 @@ STableCfg* tableCfgDup(STableCfg* pCfg) {
|
|||
SSchema* pSchema = taosMemoryMalloc(schemaSize);
|
||||
memcpy(pSchema, pCfg->pSchemas, schemaSize);
|
||||
SSchemaExt* pSchemaExt = NULL;
|
||||
if (useCompress(pCfg->tableType)) {
|
||||
if (useCompress(pCfg->tableType) && pCfg->pSchemaExt) {
|
||||
int32_t schemaExtSize = pCfg->numOfColumns * sizeof(SSchemaExt);
|
||||
pSchemaExt = taosMemoryMalloc(schemaExtSize);
|
||||
memcpy(pSchemaExt, pCfg->pSchemaExt, schemaExtSize);
|
||||
|
|
|
@ -482,7 +482,7 @@ int32_t cloneTableMeta(STableMeta* pSrc, STableMeta** pDst) {
|
|||
|
||||
int32_t metaSize = sizeof(STableMeta) + numOfField * sizeof(SSchema);
|
||||
int32_t schemaExtSize = 0;
|
||||
if (useCompress(pSrc->tableType)) {
|
||||
if (useCompress(pSrc->tableType) && pSrc->schemaExt) {
|
||||
schemaExtSize = pSrc->tableInfo.numOfColumns * sizeof(SSchemaExt);
|
||||
}
|
||||
*pDst = taosMemoryMalloc(metaSize + schemaExtSize);
|
||||
|
|
|
@ -454,7 +454,7 @@ int32_t queryCreateCTableMetaFromMsg(STableMetaRsp *msg, SCTableMeta *pMeta) {
|
|||
int32_t queryCreateTableMetaFromMsg(STableMetaRsp *msg, bool isStb, STableMeta **pMeta) {
|
||||
int32_t total = msg->numOfColumns + msg->numOfTags;
|
||||
int32_t metaSize = sizeof(STableMeta) + sizeof(SSchema) * total;
|
||||
int32_t schemaExtSize = useCompress(msg->tableType) ? sizeof(SSchemaExt) * msg->numOfColumns : 0;
|
||||
int32_t schemaExtSize = (useCompress(msg->tableType) && msg->pSchemaExt) ? sizeof(SSchemaExt) * msg->numOfColumns : 0;
|
||||
|
||||
STableMeta *pTableMeta = taosMemoryCalloc(1, metaSize + schemaExtSize);
|
||||
if (NULL == pTableMeta) {
|
||||
|
@ -475,7 +475,7 @@ int32_t queryCreateTableMetaFromMsg(STableMetaRsp *msg, bool isStb, STableMeta *
|
|||
pTableMeta->tableInfo.numOfColumns = msg->numOfColumns;
|
||||
|
||||
memcpy(pTableMeta->schema, msg->pSchemas, sizeof(SSchema) * total);
|
||||
if (useCompress(msg->tableType)) {
|
||||
if (useCompress(msg->tableType) && msg->pSchemaExt) {
|
||||
pTableMeta->schemaExt = pSchemaExt;
|
||||
memcpy(pSchemaExt, msg->pSchemaExt, schemaExtSize);
|
||||
} else {
|
||||
|
|
|
@ -157,9 +157,7 @@ typedef enum ECHECKPOINT_BACKUP_TYPE {
|
|||
|
||||
ECHECKPOINT_BACKUP_TYPE streamGetCheckpointBackupType();
|
||||
|
||||
int32_t streamTaskBackupCheckpoint(char* id, char* path);
|
||||
int32_t downloadCheckpoint(char* id, char* path);
|
||||
int32_t deleteCheckpoint(char* id);
|
||||
int32_t streamTaskDownloadCheckpointData(char* id, char* path);
|
||||
|
||||
int32_t streamTaskOnNormalTaskReady(SStreamTask* pTask);
|
||||
int32_t streamTaskOnScanhistoryTaskReady(SStreamTask* pTask);
|
||||
|
|
|
@ -333,7 +333,7 @@ int32_t rebuildFromRemoteChkp_rsync(char* key, char* chkpPath, int64_t chkpId, c
|
|||
taosRemoveDir(defaultPath);
|
||||
}
|
||||
|
||||
code = downloadCheckpoint(key, chkpPath);
|
||||
code = streamTaskDownloadCheckpointData(key, chkpPath);
|
||||
if (code != 0) {
|
||||
return code;
|
||||
}
|
||||
|
@ -342,7 +342,7 @@ int32_t rebuildFromRemoteChkp_rsync(char* key, char* chkpPath, int64_t chkpId, c
|
|||
return code;
|
||||
}
|
||||
int32_t rebuildFromRemoteChkp_s3(char* key, char* chkpPath, int64_t chkpId, char* defaultPath) {
|
||||
int32_t code = downloadCheckpoint(key, chkpPath);
|
||||
int32_t code = streamTaskDownloadCheckpointData(key, chkpPath);
|
||||
if (code != 0) {
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -35,6 +35,7 @@ static int32_t streamTaskUpdateCheckInfo(STaskCheckInfo* pInfo, int32_t taskId,
|
|||
static void setCheckDownstreamReqInfo(SStreamTaskCheckReq* pReq, int64_t reqId, int32_t dstTaskId, int32_t dstNodeId);
|
||||
static void getCheckRspStatus(STaskCheckInfo* pInfo, int64_t el, int32_t* numOfReady, int32_t* numOfFault,
|
||||
int32_t* numOfNotRsp, SArray* pTimeoutList, SArray* pNotReadyList, const char* id);
|
||||
static int32_t addDownstreamFailedStatusResultAsync(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int32_t taskId);
|
||||
static SDownstreamStatusInfo* findCheckRspStatus(STaskCheckInfo* pInfo, int32_t taskId);
|
||||
|
||||
// check status
|
||||
|
@ -383,6 +384,8 @@ int32_t streamTaskAddReqInfo(STaskCheckInfo* pInfo, int64_t reqId, int32_t taskI
|
|||
}
|
||||
|
||||
void doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p) {
|
||||
const char* id = pTask->id.idStr;
|
||||
|
||||
SStreamTaskCheckReq req = {
|
||||
.streamId = pTask->id.streamId,
|
||||
.upstreamTaskId = pTask->id.taskId,
|
||||
|
@ -397,7 +400,7 @@ void doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p) {
|
|||
setCheckDownstreamReqInfo(&req, p->reqId, pDispatch->taskId, pDispatch->taskId);
|
||||
|
||||
stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " re-send check downstream task:0x%x(vgId:%d) reqId:0x%" PRIx64,
|
||||
pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, req.reqId);
|
||||
id, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, req.reqId);
|
||||
|
||||
streamSendCheckMsg(pTask, &req, pOutputInfo->fixedDispatcher.nodeId, &pOutputInfo->fixedDispatcher.epSet);
|
||||
} else if (pOutputInfo->type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||
|
@ -412,8 +415,7 @@ void doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p) {
|
|||
|
||||
stDebug("s-task:%s (vgId:%d) stage:%" PRId64
|
||||
" re-send check downstream task:0x%x(vgId:%d) (shuffle), idx:%d reqId:0x%" PRIx64,
|
||||
pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, i,
|
||||
p->reqId);
|
||||
id, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, i, p->reqId);
|
||||
streamSendCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
|
||||
break;
|
||||
}
|
||||
|
@ -524,7 +526,7 @@ void handleNotReadyDownstreamTask(SStreamTask* pTask, SArray* pNotReadyList) {
|
|||
// the action of add status may incur the restart procedure, which should NEVER be executed in the timer thread.
|
||||
// The restart of all tasks requires that all tasks should not have active timer for now. Therefore, the execution
|
||||
// of restart in timer thread will result in a dead lock.
|
||||
static int32_t addDownstreamFailedStatusResultAsync(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int32_t taskId) {
|
||||
int32_t addDownstreamFailedStatusResultAsync(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int32_t taskId) {
|
||||
SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq));
|
||||
if (pRunReq == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
@ -614,8 +616,8 @@ void rspMonitorFn(void* param, void* tmrId) {
|
|||
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
||||
stDebug(
|
||||
"s-task:%s status:%s vgId:%d all rsp. quit from monitor rsp tmr, since vnode-transfer/leader-change/restart "
|
||||
"detected, notRsp:%d, notReady:%d, fault:%d, timeout:%d, ready:%d ref:%d",
|
||||
id, pStat->name, vgId, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady, ref);
|
||||
"detected, total:%d, notRsp:%d, notReady:%d, fault:%d, timeout:%d, ready:%d ref:%d",
|
||||
id, pStat->name, vgId, total, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady, ref);
|
||||
|
||||
streamTaskCompleteCheckRsp(pInfo, false, id);
|
||||
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
||||
|
@ -630,9 +632,9 @@ void rspMonitorFn(void* param, void* tmrId) {
|
|||
if (pInfo->stopCheckProcess == 1) {
|
||||
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
||||
stDebug(
|
||||
"s-task:%s status:%s vgId:%d stopped by other threads to check downstream process, notRsp:%d, notReady:%d, "
|
||||
"fault:%d, timeout:%d, ready:%d ref:%d",
|
||||
id, pStat->name, vgId, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady, ref);
|
||||
"s-task:%s status:%s vgId:%d stopped by other threads to check downstream process, total:%d, notRsp:%d, "
|
||||
"notReady:%d, fault:%d, timeout:%d, ready:%d ref:%d",
|
||||
id, pStat->name, vgId, total, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady, ref);
|
||||
|
||||
streamTaskCompleteCheckRsp(pInfo, false, id);
|
||||
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
||||
|
|
|
@ -29,7 +29,9 @@ typedef struct {
|
|||
} SAsyncUploadArg;
|
||||
|
||||
static int32_t downloadCheckpointDataByName(const char* id, const char* fname, const char* dstName);
|
||||
static int32_t deleteCheckpointFile(char* id, char* name);
|
||||
static int32_t deleteCheckpointFile(const char* id, const char* name);
|
||||
static int32_t streamTaskBackupCheckpoint(char* id, char* path);
|
||||
static int32_t deleteCheckpoint(char* id);
|
||||
|
||||
int32_t tEncodeStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckpointSourceReq* pReq) {
|
||||
if (tStartEncode(pEncoder) < 0) return -1;
|
||||
|
@ -658,9 +660,9 @@ int32_t downloadCheckpointDataByName(const char* id, const char* fname, const ch
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t downloadCheckpoint(char* id, char* path) {
|
||||
int32_t streamTaskDownloadCheckpointData(char* id, char* path) {
|
||||
if (id == NULL || path == NULL || strlen(id) == 0 || strlen(path) == 0 || strlen(path) >= PATH_MAX) {
|
||||
stError("downloadCheckpoint parameters invalid");
|
||||
stError("streamTaskDownloadCheckpointData parameters invalid");
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -686,7 +688,7 @@ int32_t deleteCheckpoint(char* id) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t deleteCheckpointFile(char* id, char* name) {
|
||||
int32_t deleteCheckpointFile(const char* id, const char* name) {
|
||||
char object[128] = {0};
|
||||
snprintf(object, sizeof(object), "%s/%s", id, name);
|
||||
char* tmp = object;
|
||||
|
|
|
@ -117,31 +117,22 @@ int32_t metaRefMgtAdd(int64_t vgId, int64_t* rid) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
typedef struct {
|
||||
int64_t chkpId;
|
||||
char* path;
|
||||
char* taskId;
|
||||
|
||||
SArray* pChkpSave;
|
||||
SArray* pChkpInUse;
|
||||
int8_t chkpCap;
|
||||
void* backend;
|
||||
|
||||
} StreamMetaTaskState;
|
||||
|
||||
int32_t streamMetaOpenTdb(SStreamMeta* pMeta) {
|
||||
if (tdbOpen(pMeta->path, 16 * 1024, 1, &pMeta->db, 0, 0, NULL) < 0) {
|
||||
stError("vgId:%d open file:%s failed, stream meta open failed", pMeta->vgId, pMeta->path);
|
||||
return -1;
|
||||
// goto _err;
|
||||
}
|
||||
|
||||
if (tdbTbOpen("task.db", STREAM_TASK_KEY_LEN, -1, NULL, pMeta->db, &pMeta->pTaskDb, 0) < 0) {
|
||||
stError("vgId:%d, open task.db failed, stream meta open failed", pMeta->vgId);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (tdbTbOpen("checkpoint.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pCheckpointDb, 0) < 0) {
|
||||
stError("vgId:%d, open checkpoint.db failed, stream meta open failed", pMeta->vgId);
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -231,17 +222,18 @@ int32_t streamMetaMayCvtDbFormat(SStreamMeta* pMeta) {
|
|||
if (compatible == STREAM_STATA_COMPATIBLE) {
|
||||
return 0;
|
||||
} else if (compatible == STREAM_STATA_NEED_CONVERT) {
|
||||
stInfo("stream state need covert backend format");
|
||||
stInfo("vgId:%d stream state need covert backend format", pMeta->vgId);
|
||||
|
||||
return streamMetaCvtDbFormat(pMeta);
|
||||
} else if (compatible == STREAM_STATA_NO_COMPATIBLE) {
|
||||
stError(
|
||||
"stream read incompatible data, rm %s/vnode/vnode*/tq/stream if taosd cannot start, and rebuild stream "
|
||||
"vgId:%d stream read incompatible data, rm %s/vnode/vnode*/tq/stream if taosd cannot start, and rebuild stream "
|
||||
"manually",
|
||||
tsDataDir);
|
||||
pMeta->vgId, tsDataDir);
|
||||
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -322,35 +314,43 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
|
|||
}
|
||||
|
||||
if (streamMetaMayCvtDbFormat(pMeta) < 0) {
|
||||
stError("vgId:%d convert sub info format failed, open stream meta failed", pMeta->vgId);
|
||||
goto _err;
|
||||
}
|
||||
|
||||
if (streamMetaBegin(pMeta) < 0) {
|
||||
stError("vgId:%d begin trans for stream meta failed", pMeta->vgId);
|
||||
goto _err;
|
||||
}
|
||||
|
||||
_hash_fn_t fp = taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR);
|
||||
pMeta->pTasksMap = taosHashInit(64, fp, true, HASH_NO_LOCK);
|
||||
if (pMeta->pTasksMap == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
pMeta->updateInfo.pTasks = taosHashInit(64, fp, false, HASH_NO_LOCK);
|
||||
if (pMeta->updateInfo.pTasks == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
pMeta->startInfo.pReadyTaskSet = taosHashInit(64, fp, false, HASH_NO_LOCK);
|
||||
if (pMeta->startInfo.pReadyTaskSet == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
pMeta->startInfo.pFailedTaskSet = taosHashInit(4, fp, false, HASH_NO_LOCK);
|
||||
if (pMeta->startInfo.pFailedTaskSet == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
pMeta->pHbInfo = taosMemoryCalloc(1, sizeof(SMetaHbInfo));
|
||||
if (pMeta->pHbInfo == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
|
@ -373,8 +373,8 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
|
|||
|
||||
pMeta->numOfPausedTasks = 0;
|
||||
pMeta->numOfStreamTasks = 0;
|
||||
stInfo("vgId:%d open stream meta successfully, latest checkpoint:%" PRId64 ", stage:%" PRId64, vgId, pMeta->chkpId,
|
||||
stage);
|
||||
|
||||
stInfo("vgId:%d open stream meta succ, latest checkpoint:%" PRId64 ", stage:%" PRId64, vgId, pMeta->chkpId, stage);
|
||||
|
||||
pMeta->rid = taosAddRef(streamMetaId, pMeta);
|
||||
|
||||
|
@ -824,7 +824,8 @@ int64_t streamMetaGetLatestCheckpointId(SStreamMeta* pMeta) {
|
|||
return chkpId;
|
||||
}
|
||||
|
||||
int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) {
|
||||
// not allowed to return error code
|
||||
void streamMetaLoadAllTasks(SStreamMeta* pMeta) {
|
||||
TBC* pCur = NULL;
|
||||
void* pKey = NULL;
|
||||
int32_t kLen = 0;
|
||||
|
@ -833,7 +834,7 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) {
|
|||
SDecoder decoder;
|
||||
|
||||
if (pMeta == NULL) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
return;
|
||||
}
|
||||
|
||||
SArray* pRecycleList = taosArrayInit(4, sizeof(STaskId));
|
||||
|
@ -844,7 +845,7 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) {
|
|||
if (code != TSDB_CODE_SUCCESS) {
|
||||
stError("vgId:%d failed to open stream meta, code:%s, not load any stream tasks", vgId, tstrerror(terrno));
|
||||
taosArrayDestroy(pRecycleList);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
return;
|
||||
}
|
||||
|
||||
tdbTbcMoveToFirst(pCur);
|
||||
|
@ -937,7 +938,6 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) {
|
|||
pMeta->numOfStreamTasks, pMeta->numOfPausedTasks);
|
||||
|
||||
taosArrayDestroy(pRecycleList);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) {
|
||||
|
@ -1263,7 +1263,8 @@ bool streamMetaTaskInTimer(SStreamMeta* pMeta) {
|
|||
|
||||
SStreamTask* pTask = *(SStreamTask**)pIter;
|
||||
if (pTask->status.timerActive >= 1) {
|
||||
stDebug("s-task:%s in timer, blocking tasks in vgId:%d restart", pTask->id.idStr, pMeta->vgId);
|
||||
stDebug("s-task:%s in timer, blocking tasks in vgId:%d restart, set closing again", pTask->id.idStr, pMeta->vgId);
|
||||
streamTaskStop(pTask);
|
||||
inTimer = true;
|
||||
}
|
||||
}
|
||||
|
@ -1643,7 +1644,7 @@ int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int3
|
|||
|
||||
if (pStartInfo->startAllTasks != 1) {
|
||||
int64_t el = endTs - startTs;
|
||||
qDebug("vgId:%d not start all task(s), not record status, s-task:0x%x launch succ:%d elapsed time:%" PRId64 "ms",
|
||||
stDebug("vgId:%d not start all task(s), not record status, s-task:0x%x launch succ:%d elapsed time:%" PRId64 "ms",
|
||||
pMeta->vgId, taskId, ready, el);
|
||||
streamMetaWUnLock(pMeta);
|
||||
return 0;
|
||||
|
@ -1651,7 +1652,7 @@ int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int3
|
|||
|
||||
void* p = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
|
||||
if (p == NULL) { // task does not exists in current vnode, not record the complete info
|
||||
qError("vgId:%d s-task:0x%x not exists discard the check downstream info", pMeta->vgId, taskId);
|
||||
stError("vgId:%d s-task:0x%x not exists discard the check downstream info", pMeta->vgId, taskId);
|
||||
streamMetaWUnLock(pMeta);
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -59,7 +59,7 @@ TEST(testCase, checkpointUpload_Test) {
|
|||
|
||||
TEST(testCase, checkpointDownload_Test) {
|
||||
char* id = "2013892036";
|
||||
// downloadCheckpoint(id, "/root/offset/download/");
|
||||
// streamTaskDownloadCheckpointData(id, "/root/offset/download/");
|
||||
}
|
||||
|
||||
TEST(testCase, checkpointDelete_Test) {
|
||||
|
|
|
@ -224,7 +224,7 @@ int32_t taosMulModeMkDir(const char *dirname, int mode, bool checkAccess) {
|
|||
#ifdef WINDOWS
|
||||
code = _mkdir(temp, mode);
|
||||
#elif defined(DARWIN)
|
||||
code = mkdir(dirname, 0777);
|
||||
code = mkdir(temp, 0777);
|
||||
#else
|
||||
code = mkdir(temp, mode);
|
||||
#endif
|
||||
|
|
|
@ -222,7 +222,7 @@ class TDTestCase:
|
|||
|
||||
tdSql.query("select * from information_schema.ins_columns where db_name ='information_schema'")
|
||||
tdLog.info(len(tdSql.queryResult))
|
||||
tdSql.checkEqual(True, len(tdSql.queryResult) in range(253, 254))
|
||||
tdSql.checkEqual(True, len(tdSql.queryResult) in range(254, 255))
|
||||
|
||||
tdSql.query("select * from information_schema.ins_columns where db_name ='performance_schema'")
|
||||
tdSql.checkEqual(54, len(tdSql.queryResult))
|
||||
|
|
|
@ -488,6 +488,8 @@ int32_t shellReadCommand(char *command) {
|
|||
c = taosGetConsoleChar();
|
||||
|
||||
if (c == (char)EOF) {
|
||||
taosMemoryFreeClear(cmd.buffer);
|
||||
taosMemoryFreeClear(cmd.command);
|
||||
return c;
|
||||
}
|
||||
|
||||
|
@ -524,6 +526,8 @@ int32_t shellReadCommand(char *command) {
|
|||
case 4: // EOF or Ctrl+D
|
||||
taosResetTerminalMode();
|
||||
printf("\r\n");
|
||||
taosMemoryFreeClear(cmd.buffer);
|
||||
taosMemoryFreeClear(cmd.command);
|
||||
return -1;
|
||||
case 5: // ctrl E
|
||||
shellPositionCursorEnd(&cmd);
|
||||
|
|
Loading…
Reference in New Issue