Merge branch '3.0' of https://github.com/taosdata/TDengine into fix/TD-29130
This commit is contained in:
commit
85fec026d6
|
@ -24,7 +24,7 @@ SELECT [hints] [DISTINCT] [TAGS] select_list
|
|||
hints: /*+ [hint([hint_param_list])] [hint([hint_param_list])] */
|
||||
|
||||
hint:
|
||||
BATCH_SCAN | NO_BATCH_SCAN | SORT_FOR_GROUP | PARA_TABLES_SORT
|
||||
BATCH_SCAN | NO_BATCH_SCAN | SORT_FOR_GROUP | PARA_TABLES_SORT | PARTITION_FIRST | SMALLDATA_TS_SORT
|
||||
|
||||
select_list:
|
||||
select_expr [, select_expr] ...
|
||||
|
@ -94,6 +94,7 @@ The list of currently supported Hints is as follows:
|
|||
| SORT_FOR_GROUP| None | Use sort for partition, conflict with PARTITION_FIRST | With normal column in partition by list |
|
||||
| PARTITION_FIRST| None | Use Partition before aggregate, conflict with SORT_FOR_GROUP | With normal column in partition by list |
|
||||
| PARA_TABLES_SORT| None | When sorting the supertable rows by timestamp, No temporary disk space is used. When there are numerous tables, each with long rows, the corresponding algorithm associated with this prompt may consume a substantial amount of memory, potentially leading to an Out Of Memory (OOM) situation. | Sorting the supertable rows by timestamp |
|
||||
| SMALLDATA_TS_SORT| None | When sorting the supertable rows by timestamp, if the length of query columns >= 256, and there are relatively few rows, this hint can improve performance. | Sorting the supertable rows by timestamp |
|
||||
|
||||
For example:
|
||||
|
||||
|
@ -102,6 +103,7 @@ SELECT /*+ BATCH_SCAN() */ a.ts FROM stable1 a, stable2 b where a.tag0 = b.tag0
|
|||
SELECT /*+ SORT_FOR_GROUP() */ count(*), c1 FROM stable1 PARTITION BY c1;
|
||||
SELECT /*+ PARTITION_FIRST() */ count(*), c1 FROM stable1 PARTITION BY c1;
|
||||
SELECT /*+ PARA_TABLES_SORT() */ * from stable1 order by ts;
|
||||
SELECT /*+ SMALLDATA_TS_SORT() */ * from stable1 order by ts;
|
||||
```
|
||||
|
||||
## Lists
|
||||
|
|
|
@ -24,7 +24,7 @@ SELECT [hints] [DISTINCT] [TAGS] select_list
|
|||
hints: /*+ [hint([hint_param_list])] [hint([hint_param_list])] */
|
||||
|
||||
hint:
|
||||
BATCH_SCAN | NO_BATCH_SCAN | SORT_FOR_GROUP | PARA_TABLES_SORT
|
||||
BATCH_SCAN | NO_BATCH_SCAN | SORT_FOR_GROUP | PARTITION_FIRST | PARA_TABLES_SORT | SMALLDATA_TS_SORT
|
||||
|
||||
select_list:
|
||||
select_expr [, select_expr] ...
|
||||
|
@ -94,6 +94,8 @@ Hints 是用户控制单个语句查询优化的一种手段,当 Hint 不适
|
|||
| SORT_FOR_GROUP| 无 | 采用sort方式进行分组, 与PARTITION_FIRST冲突 | partition by 列表有普通列时 |
|
||||
| PARTITION_FIRST| 无 | 在聚合之前使用PARTITION计算分组, 与SORT_FOR_GROUP冲突 | partition by 列表有普通列时 |
|
||||
| PARA_TABLES_SORT| 无 | 超级表的数据按时间戳排序时, 不使用临时磁盘空间, 只使用内存。当子表数量多, 行长比较大时候, 会使用大量内存, 可能发生OOM | 超级表的数据按时间戳排序时 |
|
||||
| SMALLDATA_TS_SORT| 无 | 超级表的数据按时间戳排序时, 查询列长度大于等于256, 但是行数不多, 使用这个提示, 可以提高性能 | 超级表的数据按时间戳排序时 |
|
||||
|
||||
举例:
|
||||
|
||||
```sql
|
||||
|
@ -101,6 +103,7 @@ SELECT /*+ BATCH_SCAN() */ a.ts FROM stable1 a, stable2 b where a.tag0 = b.tag0
|
|||
SELECT /*+ SORT_FOR_GROUP() */ count(*), c1 FROM stable1 PARTITION BY c1;
|
||||
SELECT /*+ PARTITION_FIRST() */ count(*), c1 FROM stable1 PARTITION BY c1;
|
||||
SELECT /*+ PARA_TABLES_SORT() */ * from stable1 order by ts;
|
||||
SELECT /*+ SMALLDATA_TS_SORT() */ * from stable1 order by ts;
|
||||
```
|
||||
|
||||
## 列表
|
||||
|
|
|
@ -49,6 +49,7 @@ window_clause: {
|
|||
| STATE_WINDOW(col)
|
||||
| INTERVAL(interval_val [, interval_offset]) [SLIDING (sliding_val)] [FILL(fill_mod_and_val)]
|
||||
| EVENT_WINDOW START WITH start_trigger_condition END WITH end_trigger_condition
|
||||
| COUNT_WINDOW(count_val[, sliding_val])
|
||||
}
|
||||
```
|
||||
|
||||
|
@ -180,6 +181,19 @@ select _wstart, _wend, count(*) from t event_window start with c1 > 0 end with c
|
|||
|
||||

|
||||
|
||||
### 计数窗口
|
||||
|
||||
计数窗口按固定的数据行数来划分窗口。默认将数据按时间戳排序,再按照count_val的值,将数据划分为多个窗口,然后做聚合计算。count_val表示每个count window包含的最大数据行数,总数据行数不能整除count_val时,最后一个窗口的行数会小于count_val。sliding_val是常量,表示窗口滑动的数量,类似于 interval的SLIDING。
|
||||
|
||||
以下面的 SQL 语句为例,计数窗口切分如图所示:
|
||||
```sql
|
||||
select _wstart, _wend, count(*) from t count_window(4);
|
||||
```
|
||||
|
||||

|
||||
|
||||
|
||||
|
||||
### 时间戳伪列
|
||||
|
||||
窗口聚合查询结果中,如果 SQL 语句中没有指定输出查询结果中的时间戳列,那么最终结果中不会自动包含窗口的时间列信息。如果需要在结果中输出聚合结果所对应的时间窗口信息,需要在 SELECT 子句中使用时间戳相关的伪列: 时间窗口起始时间 (\_WSTART), 时间窗口结束时间 (\_WEND), 时间窗口持续时间 (\_WDURATION), 以及查询整体窗口相关的伪列: 查询窗口起始时间(\_QSTART) 和查询窗口结束时间(\_QEND)。需要注意的是时间窗口起始时间和结束时间均是闭区间,时间窗口持续时间是数据当前时间分辨率下的数值。例如,如果当前数据库的时间分辨率是毫秒,那么结果中 500 就表示当前时间窗口的持续时间是 500毫秒 (500 ms)。
|
||||
|
|
|
@ -380,7 +380,7 @@
|
|||
#define TK_SORT_FOR_GROUP 608
|
||||
#define TK_PARTITION_FIRST 609
|
||||
#define TK_PARA_TABLES_SORT 610
|
||||
|
||||
#define TK_SMALLDATA_TS_SORT 611
|
||||
|
||||
#define TK_NK_NIL 65535
|
||||
|
||||
|
|
|
@ -223,10 +223,10 @@ typedef struct SStoreTqReader {
|
|||
bool (*tqReaderCurrentBlockConsumed)();
|
||||
|
||||
struct SWalReader* (*tqReaderGetWalReader)(); // todo remove it
|
||||
int32_t (*tqReaderRetrieveTaosXBlock)(); // todo remove it
|
||||
// int32_t (*tqReaderRetrieveTaosXBlock)(); // todo remove it
|
||||
|
||||
int32_t (*tqReaderSetSubmitMsg)(); // todo remove it
|
||||
bool (*tqReaderNextBlockFilterOut)();
|
||||
// bool (*tqReaderNextBlockFilterOut)();
|
||||
} SStoreTqReader;
|
||||
|
||||
typedef struct SStoreSnapshotFn {
|
||||
|
|
|
@ -122,6 +122,7 @@ typedef struct SScanLogicNode {
|
|||
bool isCountByTag; // true if selectstmt hasCountFunc & part by tag/tbname
|
||||
SArray* pFuncTypes; // for last, last_row
|
||||
bool paraTablesSort; // for table merge scan
|
||||
bool smallDataTsSort; // disable row id sort for table merge scan
|
||||
} SScanLogicNode;
|
||||
|
||||
typedef struct SJoinLogicNode {
|
||||
|
@ -445,6 +446,7 @@ typedef struct STableScanPhysiNode {
|
|||
bool filesetDelimited;
|
||||
bool needCountEmptyTable;
|
||||
bool paraTablesSort;
|
||||
bool smallDataTsSort;
|
||||
} STableScanPhysiNode;
|
||||
|
||||
typedef STableScanPhysiNode STableSeqScanPhysiNode;
|
||||
|
|
|
@ -128,7 +128,8 @@ typedef enum EHintOption {
|
|||
HINT_BATCH_SCAN,
|
||||
HINT_SORT_FOR_GROUP,
|
||||
HINT_PARTITION_FIRST,
|
||||
HINT_PARA_TABLES_SORT
|
||||
HINT_PARA_TABLES_SORT,
|
||||
HINT_SMALLDATA_TS_SORT,
|
||||
} EHintOption;
|
||||
|
||||
typedef struct SHintNode {
|
||||
|
|
|
@ -62,6 +62,7 @@ typedef struct SRpcHandleInfo {
|
|||
|
||||
SRpcConnInfo conn;
|
||||
int8_t forbiddenIp;
|
||||
int8_t notFreeAhandle;
|
||||
|
||||
} SRpcHandleInfo;
|
||||
|
||||
|
|
|
@ -119,6 +119,13 @@ int32_t taosSetFileHandlesLimit();
|
|||
|
||||
int32_t taosLinkFile(char *src, char *dst);
|
||||
|
||||
FILE* taosOpenCFile(const char* filename, const char* mode);
|
||||
int taosSeekCFile(FILE* file, int64_t offset, int whence);
|
||||
size_t taosReadFromCFile(void *buffer, size_t size, size_t count, FILE *stream );
|
||||
size_t taosWriteToCFile(const void* ptr, size_t size, size_t nitems, FILE* stream);
|
||||
int taosCloseCFile(FILE *);
|
||||
int taosSetAutoDelFile(char* path);
|
||||
|
||||
bool lastErrorIsFileNotExist();
|
||||
|
||||
#ifdef __cplusplus
|
||||
|
|
|
@ -298,8 +298,7 @@ void* doFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4);
|
|||
|
||||
void doSetOneRowPtr(SReqResultInfo* pResultInfo);
|
||||
void setResPrecision(SReqResultInfo* pResInfo, int32_t precision);
|
||||
int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp, bool convertUcs4,
|
||||
bool freeAfterUse);
|
||||
int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp, bool convertUcs4);
|
||||
int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows,
|
||||
bool convertUcs4);
|
||||
void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols);
|
||||
|
|
|
@ -302,7 +302,7 @@ int32_t execLocalCmd(SRequestObj* pRequest, SQuery* pQuery) {
|
|||
int8_t biMode = atomic_load_8(&pRequest->pTscObj->biMode);
|
||||
int32_t code = qExecCommand(&pRequest->pTscObj->id, pRequest->pTscObj->sysInfo, pQuery->pRoot, &pRsp, biMode);
|
||||
if (TSDB_CODE_SUCCESS == code && NULL != pRsp) {
|
||||
code = setQueryResultFromRsp(&pRequest->body.resInfo, pRsp, false, true);
|
||||
code = setQueryResultFromRsp(&pRequest->body.resInfo, pRsp, false);
|
||||
}
|
||||
|
||||
return code;
|
||||
|
@ -341,7 +341,7 @@ void asyncExecLocalCmd(SRequestObj* pRequest, SQuery* pQuery) {
|
|||
int32_t code = qExecCommand(&pRequest->pTscObj->id, pRequest->pTscObj->sysInfo, pQuery->pRoot, &pRsp,
|
||||
atomic_load_8(&pRequest->pTscObj->biMode));
|
||||
if (TSDB_CODE_SUCCESS == code && NULL != pRsp) {
|
||||
code = setQueryResultFromRsp(&pRequest->body.resInfo, pRsp, false, true);
|
||||
code = setQueryResultFromRsp(&pRequest->body.resInfo, pRsp, false);
|
||||
}
|
||||
|
||||
SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
|
||||
|
@ -1721,7 +1721,7 @@ void* doFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4)
|
|||
}
|
||||
|
||||
pRequest->code =
|
||||
setQueryResultFromRsp(&pRequest->body.resInfo, (const SRetrieveTableRsp*)pResInfo->pData, convertUcs4, true);
|
||||
setQueryResultFromRsp(&pRequest->body.resInfo, (const SRetrieveTableRsp*)pResInfo->pData, convertUcs4);
|
||||
if (pRequest->code != TSDB_CODE_SUCCESS) {
|
||||
pResultInfo->numOfRows = 0;
|
||||
return NULL;
|
||||
|
@ -2180,15 +2180,13 @@ void resetConnectDB(STscObj* pTscObj) {
|
|||
taosThreadMutexUnlock(&pTscObj->mutex);
|
||||
}
|
||||
|
||||
int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp, bool convertUcs4,
|
||||
bool freeAfterUse) {
|
||||
int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp, bool convertUcs4) {
|
||||
if (pResultInfo == NULL || pRsp == NULL) {
|
||||
tscError("setQueryResultFromRsp paras is null");
|
||||
return TSDB_CODE_TSC_INTERNAL_ERROR;
|
||||
}
|
||||
|
||||
if (freeAfterUse) taosMemoryFreeClear(pResultInfo->pRspMsg);
|
||||
|
||||
taosMemoryFreeClear(pResultInfo->pRspMsg);
|
||||
pResultInfo->pRspMsg = (const char*)pRsp;
|
||||
pResultInfo->pData = (void*)pRsp->data;
|
||||
pResultInfo->numOfRows = htobe64(pRsp->numOfRows);
|
||||
|
@ -2618,7 +2616,7 @@ static void fetchCallback(void* pResult, void* param, int32_t code) {
|
|||
}
|
||||
|
||||
pRequest->code =
|
||||
setQueryResultFromRsp(pResultInfo, (const SRetrieveTableRsp*)pResultInfo->pData, pResultInfo->convertUcs4, true);
|
||||
setQueryResultFromRsp(pResultInfo, (const SRetrieveTableRsp*)pResultInfo->pData, pResultInfo->convertUcs4);
|
||||
if (pRequest->code != TSDB_CODE_SUCCESS) {
|
||||
pResultInfo->numOfRows = 0;
|
||||
pRequest->code = code;
|
||||
|
|
|
@ -350,7 +350,6 @@ void taos_free_result(TAOS_RES *res) {
|
|||
taosArrayDestroy(pRsp->rsp.createTableLen);
|
||||
taosArrayDestroyP(pRsp->rsp.createTableReq, taosMemoryFree);
|
||||
|
||||
pRsp->resInfo.pRspMsg = NULL;
|
||||
doFreeReqResultInfo(&pRsp->resInfo);
|
||||
taosMemoryFree(pRsp);
|
||||
} else if (TD_RES_TMQ(res)) {
|
||||
|
@ -359,7 +358,6 @@ void taos_free_result(TAOS_RES *res) {
|
|||
taosArrayDestroy(pRsp->rsp.blockDataLen);
|
||||
taosArrayDestroyP(pRsp->rsp.blockTbName, taosMemoryFree);
|
||||
taosArrayDestroyP(pRsp->rsp.blockSchema, (FDelete)tDeleteSchemaWrapper);
|
||||
pRsp->resInfo.pRspMsg = NULL;
|
||||
doFreeReqResultInfo(&pRsp->resInfo);
|
||||
taosMemoryFree(pRsp);
|
||||
} else if (TD_RES_TMQ_META(res)) {
|
||||
|
|
|
@ -538,7 +538,7 @@ int32_t processShowVariablesRsp(void* param, SDataBuf* pMsg, int32_t code) {
|
|||
code = buildShowVariablesRsp(rsp.variables, &pRes);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = setQueryResultFromRsp(&pRequest->body.resInfo, pRes, false, true);
|
||||
code = setQueryResultFromRsp(&pRequest->body.resInfo, pRes, false);
|
||||
}
|
||||
|
||||
if (code != 0) {
|
||||
|
@ -651,7 +651,7 @@ int32_t processCompactDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
|
|||
code = buildRetriveTableRspForCompactDb(&rsp, &pRes);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = setQueryResultFromRsp(&pRequest->body.resInfo, pRes, false, true);
|
||||
code = setQueryResultFromRsp(&pRequest->body.resInfo, pRes, false);
|
||||
}
|
||||
|
||||
if (code != 0) {
|
||||
|
|
|
@ -2628,13 +2628,9 @@ SReqResultInfo* tmqGetNextResInfo(TAOS_RES* res, bool convertUcs4) {
|
|||
SRetrieveTableRspForTmq* pRetrieveTmq =
|
||||
(SRetrieveTableRspForTmq*)taosArrayGetP(pRspObj->rsp.blockData, pRspObj->resIter);
|
||||
if (pRspObj->rsp.withSchema) {
|
||||
doFreeReqResultInfo(&pRspObj->resInfo);
|
||||
SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(pRspObj->rsp.blockSchema, pRspObj->resIter);
|
||||
setResSchemaInfo(&pRspObj->resInfo, pSW->pSchema, pSW->nCols);
|
||||
taosMemoryFreeClear(pRspObj->resInfo.row);
|
||||
taosMemoryFreeClear(pRspObj->resInfo.pCol);
|
||||
taosMemoryFreeClear(pRspObj->resInfo.length);
|
||||
taosMemoryFreeClear(pRspObj->resInfo.convertBuf);
|
||||
taosMemoryFreeClear(pRspObj->resInfo.convertJson);
|
||||
}
|
||||
|
||||
pRspObj->resInfo.pData = (void*)pRetrieveTmq->data;
|
||||
|
|
|
@ -502,7 +502,7 @@ static int32_t taosAddClientCfg(SConfig *pCfg) {
|
|||
if (cfgAddInt32(pCfg, "maxRetryWaitTime", tsMaxRetryWaitTime, 0, 86400000, CFG_SCOPE_BOTH, CFG_DYN_CLIENT) != 0)
|
||||
return -1;
|
||||
if (cfgAddBool(pCfg, "useAdapter", tsUseAdapter, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT) != 0) return -1;
|
||||
if (cfgAddBool(pCfg, "crashReporting", tsEnableCrashReport, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT) != 0) return -1;
|
||||
if (cfgAddBool(pCfg, "crashReporting", tsEnableCrashReport, CFG_SCOPE_BOTH, CFG_DYN_CLIENT) != 0) return -1;
|
||||
if (cfgAddInt64(pCfg, "queryMaxConcurrentTables", tsQueryMaxConcurrentTables, INT64_MIN, INT64_MAX, CFG_SCOPE_CLIENT,
|
||||
CFG_DYN_NONE) != 0)
|
||||
return -1;
|
||||
|
@ -538,8 +538,8 @@ static int32_t taosAddClientCfg(SConfig *pCfg) {
|
|||
return -1;
|
||||
if (cfgAddBool(pCfg, "experimental", tsExperimental, CFG_SCOPE_BOTH, CFG_DYN_BOTH) != 0) return -1;
|
||||
|
||||
if (cfgAddBool(pCfg, "monitor", tsEnableMonitor, CFG_SCOPE_SERVER, CFG_DYN_SERVER) != 0) return -1;
|
||||
if (cfgAddInt32(pCfg, "monitorInterval", tsMonitorInterval, 1, 200000, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
||||
if (cfgAddBool(pCfg, "monitor", tsEnableMonitor, CFG_SCOPE_BOTH, CFG_DYN_BOTH) != 0) return -1;
|
||||
if (cfgAddInt32(pCfg, "monitorInterval", tsMonitorInterval, 1, 200000, CFG_SCOPE_BOTH, CFG_DYN_NONE) != 0) return -1;
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -600,18 +600,6 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
|
|||
return -1;
|
||||
if (cfgAddInt32(pCfg, "queryRspPolicy", tsQueryRspPolicy, 0, 1, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1;
|
||||
|
||||
tsNumOfRpcThreads = tsNumOfCores / 2;
|
||||
tsNumOfRpcThreads = TRANGE(tsNumOfRpcThreads, 2, TSDB_MAX_RPC_THREADS);
|
||||
if (cfgAddInt32(pCfg, "numOfRpcThreads", tsNumOfRpcThreads, 1, 1024, CFG_SCOPE_BOTH, CFG_DYN_NONE) != 0) return -1;
|
||||
|
||||
tsNumOfRpcSessions = TRANGE(tsNumOfRpcSessions, 100, 10000);
|
||||
if (cfgAddInt32(pCfg, "numOfRpcSessions", tsNumOfRpcSessions, 1, 100000, CFG_SCOPE_BOTH, CFG_DYN_NONE) != 0)
|
||||
return -1;
|
||||
|
||||
tsTimeToGetAvailableConn = TRANGE(tsTimeToGetAvailableConn, 20, 1000000);
|
||||
if (cfgAddInt32(pCfg, "timeToGetAvailableConn", tsNumOfRpcSessions, 20, 1000000, CFG_SCOPE_BOTH, CFG_DYN_NONE) != 0)
|
||||
return -1;
|
||||
|
||||
tsNumOfCommitThreads = tsNumOfCores / 2;
|
||||
tsNumOfCommitThreads = TRANGE(tsNumOfCommitThreads, 2, 4);
|
||||
if (cfgAddInt32(pCfg, "numOfCommitThreads", tsNumOfCommitThreads, 1, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0)
|
||||
|
@ -691,9 +679,6 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
|
|||
if (cfgAddInt32(pCfg, "grantMode", tsMndGrantMode, 0, 10000, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
||||
if (cfgAddBool(pCfg, "skipGrant", tsMndSkipGrant, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
||||
|
||||
if (cfgAddBool(pCfg, "monitor", tsEnableMonitor, CFG_SCOPE_SERVER, CFG_DYN_SERVER) != 0) return -1;
|
||||
if (cfgAddInt32(pCfg, "monitorInterval", tsMonitorInterval, 1, 200000, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0)
|
||||
return -1;
|
||||
if (cfgAddString(pCfg, "monitorFqdn", tsMonitorFqdn, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
||||
if (cfgAddInt32(pCfg, "monitorPort", tsMonitorPort, 1, 65056, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
||||
if (cfgAddInt32(pCfg, "monitorMaxLogs", tsMonitorMaxLogs, 1, 1000000, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
||||
|
@ -707,7 +692,6 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
|
|||
if (cfgAddBool(pCfg, "auditCreateTable", tsEnableAuditCreateTable, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
||||
if (cfgAddInt32(pCfg, "auditInterval", tsAuditInterval, 500, 200000, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
||||
|
||||
if (cfgAddBool(pCfg, "crashReporting", tsEnableCrashReport, CFG_SCOPE_BOTH, CFG_DYN_NONE) != 0) return -1;
|
||||
if (cfgAddBool(pCfg, "telemetryReporting", tsEnableTelem, CFG_SCOPE_BOTH, CFG_DYN_ENT_SERVER) != 0) return -1;
|
||||
if (cfgAddInt32(pCfg, "telemetryInterval", tsTelemInterval, 1, 200000, CFG_SCOPE_BOTH, CFG_DYN_NONE) != 0) return -1;
|
||||
if (cfgAddString(pCfg, "telemetryServer", tsTelemServer, CFG_SCOPE_BOTH, CFG_DYN_BOTH) != 0) return -1;
|
||||
|
@ -814,8 +798,6 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
|
|||
return -1;
|
||||
if (cfgAddBool(pCfg, "enableWhiteList", tsEnableWhiteList, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1;
|
||||
|
||||
if (cfgAddBool(pCfg, "experimental", tsExperimental, CFG_SCOPE_BOTH, CFG_DYN_BOTH) != 0) return -1;
|
||||
|
||||
// GRANT_CFG_ADD;
|
||||
return 0;
|
||||
}
|
||||
|
@ -1353,6 +1335,7 @@ int32_t taosInitCfg(const char *cfgDir, const char **envCmd, const char *envFile
|
|||
if (taosAddClientLogCfg(tsCfg) != 0) return -1;
|
||||
if (taosAddServerLogCfg(tsCfg) != 0) return -1;
|
||||
}
|
||||
|
||||
taosAddSystemCfg(tsCfg);
|
||||
|
||||
if (taosLoadCfg(tsCfg, envCmd, cfgDir, envFile, apolloUrl) != 0) {
|
||||
|
@ -1379,7 +1362,9 @@ int32_t taosInitCfg(const char *cfgDir, const char **envCmd, const char *envFile
|
|||
if (taosSetTfsCfg(tsCfg) != 0) return -1;
|
||||
if (taosSetS3Cfg(tsCfg) != 0) return -1;
|
||||
}
|
||||
|
||||
taosSetSystemCfg(tsCfg);
|
||||
|
||||
if (taosSetFileHandlesLimit() != 0) return -1;
|
||||
|
||||
taosSetAllDebugFlag(tsCfg, cfgGetItem(tsCfg, "debugFlag")->i32);
|
||||
|
@ -1626,6 +1611,10 @@ static int32_t taosCfgDynamicOptionsForClient(SConfig *pCfg, char *name) {
|
|||
tsLogSpace.reserved = (int64_t)(((double)pItem->fval) * 1024 * 1024 * 1024);
|
||||
uInfo("%s set to %" PRId64, name, tsLogSpace.reserved);
|
||||
matched = true;
|
||||
} else if (strcasecmp("monitor", name) == 0) {
|
||||
tsEnableMonitor = pItem->bval;
|
||||
uInfo("%s set to %d", name, tsEnableMonitor);
|
||||
matched = true;
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
|
|
@ -115,6 +115,7 @@ SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady) {
|
|||
|
||||
char buf[256] = {0};
|
||||
EPSET_TO_STR(&entry.epset, buf);
|
||||
|
||||
mDebug("take node snapshot, nodeId:%d %s", entry.nodeId, buf);
|
||||
taosArrayPush(pVgroupListSnapshot, &entry);
|
||||
sdbRelease(pSdb, pVgroup);
|
||||
|
@ -300,7 +301,10 @@ static int32_t doSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTa
|
|||
return code;
|
||||
}
|
||||
|
||||
mDebug("pause node:%d, epset:%d", pTask->info.nodeId, epset.numOfEps);
|
||||
char buf[256] = {0};
|
||||
EPSET_TO_STR(&epset, buf);
|
||||
mDebug("pause stream task in node:%d, epset:%s", pTask->info.nodeId, buf);
|
||||
|
||||
code = setTransAction(pTrans, pReq, sizeof(SVPauseStreamTaskReq), TDMT_STREAM_TASK_PAUSE, &epset, 0);
|
||||
if (code != 0) {
|
||||
taosMemoryFree(pReq);
|
||||
|
|
|
@ -15,11 +15,11 @@
|
|||
|
||||
#define _DEFAULT_SOURCE
|
||||
#include "mndTrans.h"
|
||||
#include "mndSubscribe.h"
|
||||
#include "mndDb.h"
|
||||
#include "mndPrivilege.h"
|
||||
#include "mndShow.h"
|
||||
#include "mndStb.h"
|
||||
#include "mndSubscribe.h"
|
||||
#include "mndSync.h"
|
||||
#include "mndUser.h"
|
||||
|
||||
|
@ -801,16 +801,17 @@ static bool mndCheckTransConflict(SMnode *pMnode, STrans *pNew) {
|
|||
if (pNew->conflict == TRN_CONFLICT_TOPIC) {
|
||||
if (pTrans->conflict == TRN_CONFLICT_GLOBAL) conflict = true;
|
||||
if (pTrans->conflict == TRN_CONFLICT_TOPIC || pTrans->conflict == TRN_CONFLICT_TOPIC_INSIDE) {
|
||||
if (strcasecmp(pNew->dbname, pTrans->dbname) == 0 ) conflict = true;
|
||||
if (strcasecmp(pNew->dbname, pTrans->dbname) == 0) conflict = true;
|
||||
}
|
||||
}
|
||||
if (pNew->conflict == TRN_CONFLICT_TOPIC_INSIDE) {
|
||||
if (pTrans->conflict == TRN_CONFLICT_GLOBAL) conflict = true;
|
||||
if (pTrans->conflict == TRN_CONFLICT_TOPIC ) {
|
||||
if (strcasecmp(pNew->dbname, pTrans->dbname) == 0 ) conflict = true;
|
||||
if (pTrans->conflict == TRN_CONFLICT_TOPIC) {
|
||||
if (strcasecmp(pNew->dbname, pTrans->dbname) == 0) conflict = true;
|
||||
}
|
||||
if (pTrans->conflict == TRN_CONFLICT_TOPIC_INSIDE) {
|
||||
if (strcasecmp(pNew->dbname, pTrans->dbname) == 0 && strcasecmp(pNew->stbname, pTrans->stbname) == 0) conflict = true;
|
||||
if (strcasecmp(pNew->dbname, pTrans->dbname) == 0 && strcasecmp(pNew->stbname, pTrans->stbname) == 0)
|
||||
conflict = true;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -847,7 +848,7 @@ int32_t mndTransCheckConflict(SMnode *pMnode, STrans *pTrans) {
|
|||
}
|
||||
|
||||
int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) {
|
||||
if(pTrans == NULL) return -1;
|
||||
if (pTrans == NULL) return -1;
|
||||
|
||||
if (mndTransCheckConflict(pMnode, pTrans) != 0) {
|
||||
return -1;
|
||||
|
@ -1142,6 +1143,7 @@ static int32_t mndTransSendSingleMsg(SMnode *pMnode, STrans *pTrans, STransActio
|
|||
return -1;
|
||||
}
|
||||
rpcMsg.info.traceId.rootId = pTrans->mTraceId;
|
||||
rpcMsg.info.notFreeAhandle = 1;
|
||||
|
||||
memcpy(rpcMsg.pCont, pAction->pCont, pAction->contLen);
|
||||
|
||||
|
@ -1156,7 +1158,7 @@ static int32_t mndTransSendSingleMsg(SMnode *pMnode, STrans *pTrans, STransActio
|
|||
int32_t code = tmsgSendReq(&pAction->epSet, &rpcMsg);
|
||||
if (code == 0) {
|
||||
pAction->msgSent = 1;
|
||||
//pAction->msgReceived = 0;
|
||||
// pAction->msgReceived = 0;
|
||||
pAction->errCode = TSDB_CODE_ACTION_IN_PROGRESS;
|
||||
mInfo("trans:%d, %s:%d is sent, %s", pTrans->id, mndTransStr(pAction->stage), pAction->id, detail);
|
||||
|
||||
|
@ -1253,16 +1255,16 @@ static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pA
|
|||
|
||||
for (int32_t action = 0; action < numOfActions; ++action) {
|
||||
STransAction *pAction = taosArrayGet(pArray, action);
|
||||
mDebug("trans:%d, %s:%d Sent:%d, Received:%d, errCode:0x%x, acceptableCode:0x%x, retryCode:0x%x",
|
||||
pTrans->id, mndTransStr(pAction->stage), pAction->id, pAction->msgSent, pAction->msgReceived,
|
||||
pAction->errCode, pAction->acceptableCode, pAction->retryCode);
|
||||
mDebug("trans:%d, %s:%d Sent:%d, Received:%d, errCode:0x%x, acceptableCode:0x%x, retryCode:0x%x", pTrans->id,
|
||||
mndTransStr(pAction->stage), pAction->id, pAction->msgSent, pAction->msgReceived, pAction->errCode,
|
||||
pAction->acceptableCode, pAction->retryCode);
|
||||
if (pAction->msgSent) {
|
||||
if (pAction->msgReceived) {
|
||||
if (pAction->errCode != 0 && pAction->errCode != pAction->acceptableCode) {
|
||||
mndTransResetAction(pMnode, pTrans, pAction);
|
||||
mInfo("trans:%d, %s:%d reset", pTrans->id, mndTransStr(pAction->stage), pAction->id);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return TSDB_CODE_ACTION_IN_PROGRESS;
|
||||
|
|
|
@ -368,24 +368,11 @@ int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, int64_t maxVer, con
|
|||
}
|
||||
}
|
||||
|
||||
// todo ignore the error in wal?
|
||||
bool tqNextBlockInWal(STqReader* pReader, const char* id, int sourceExcluded) {
|
||||
SWalReader* pWalReader = pReader->pWalReader;
|
||||
SSDataBlock* pDataBlock = NULL;
|
||||
|
||||
uint64_t st = taosGetTimestampMs();
|
||||
while (1) {
|
||||
// try next message in wal file
|
||||
if (walNextValidMsg(pWalReader) < 0) {
|
||||
return false;
|
||||
}
|
||||
|
||||
void* pBody = POINTER_SHIFT(pWalReader->pHead->head.body, sizeof(SSubmitReq2Msg));
|
||||
int32_t bodyLen = pWalReader->pHead->head.bodyLen - sizeof(SSubmitReq2Msg);
|
||||
int64_t ver = pWalReader->pHead->head.version;
|
||||
|
||||
tqReaderSetSubmitMsg(pReader, pBody, bodyLen, ver);
|
||||
pReader->nextBlk = 0;
|
||||
int32_t numOfBlocks = taosArrayGetSize(pReader->submit.aSubmitTbData);
|
||||
while (pReader->nextBlk < numOfBlocks) {
|
||||
tqTrace("tq reader next data block %d/%d, len:%d %" PRId64, pReader->nextBlk, numOfBlocks, pReader->msg.msgLen,
|
||||
|
@ -400,33 +387,32 @@ bool tqNextBlockInWal(STqReader* pReader, const char* id, int sourceExcluded) {
|
|||
tqTrace("tq reader return submit block, uid:%" PRId64, pSubmitTbData->uid);
|
||||
SSDataBlock* pRes = NULL;
|
||||
int32_t code = tqRetrieveDataBlock(pReader, &pRes, NULL);
|
||||
if (code == TSDB_CODE_SUCCESS && pRes->info.rows > 0) {
|
||||
if (pDataBlock == NULL) {
|
||||
pDataBlock = createOneDataBlock(pRes, true);
|
||||
} else {
|
||||
blockDataMerge(pDataBlock, pRes);
|
||||
}
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
return true;
|
||||
}
|
||||
} else {
|
||||
pReader->nextBlk += 1;
|
||||
tqTrace("tq reader discard submit block, uid:%" PRId64 ", continue", pSubmitTbData->uid);
|
||||
}
|
||||
}
|
||||
|
||||
tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
|
||||
pReader->msg.msgStr = NULL;
|
||||
|
||||
if (pDataBlock != NULL) {
|
||||
blockDataCleanup(pReader->pResBlock);
|
||||
copyDataBlock(pReader->pResBlock, pDataBlock);
|
||||
blockDataDestroy(pDataBlock);
|
||||
return true;
|
||||
} else {
|
||||
qTrace("stream scan return empty, all %d submit blocks consumed, %s", numOfBlocks, id);
|
||||
}
|
||||
|
||||
if (taosGetTimestampMs() - st > 1000) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// try next message in wal file
|
||||
if (walNextValidMsg(pWalReader) < 0) {
|
||||
return false;
|
||||
}
|
||||
|
||||
void* pBody = POINTER_SHIFT(pWalReader->pHead->head.body, sizeof(SSubmitReq2Msg));
|
||||
int32_t bodyLen = pWalReader->pHead->head.bodyLen - sizeof(SSubmitReq2Msg);
|
||||
int64_t ver = pWalReader->pHead->head.version;
|
||||
tqReaderSetSubmitMsg(pReader, pBody, bodyLen, ver);
|
||||
pReader->nextBlk = 0;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -128,12 +128,12 @@ void initTqAPI(SStoreTqReader* pTq) {
|
|||
pTq->tqReaderCurrentBlockConsumed = tqCurrentBlockConsumed;
|
||||
|
||||
pTq->tqReaderGetWalReader = tqGetWalReader; // todo remove it
|
||||
pTq->tqReaderRetrieveTaosXBlock = tqRetrieveTaosxBlock; // todo remove it
|
||||
// pTq->tqReaderRetrieveTaosXBlock = tqRetrieveTaosxBlock; // todo remove it
|
||||
|
||||
pTq->tqReaderSetSubmitMsg = tqReaderSetSubmitMsg; // todo remove it
|
||||
pTq->tqGetResultBlock = tqGetResultBlock;
|
||||
|
||||
pTq->tqReaderNextBlockFilterOut = tqNextDataBlockFilterOut;
|
||||
// pTq->tqReaderNextBlockFilterOut = tqNextDataBlockFilterOut;
|
||||
pTq->tqGetResultBlockTime = tqGetResultBlockTime;
|
||||
|
||||
pTq->tqGetStreamExecProgress = tqGetStreamExecInfo;
|
||||
|
|
|
@ -352,6 +352,8 @@ typedef struct STableMergeScanInfo {
|
|||
SSDataBlock* nextDurationBlocks[2];
|
||||
bool rtnNextDurationBlocks;
|
||||
int32_t nextDurationBlocksIdx;
|
||||
|
||||
bool bSortRowId;
|
||||
|
||||
STmsSubTablesMergeInfo* pSubTablesMergeInfo;
|
||||
} STableMergeScanInfo;
|
||||
|
|
|
@ -194,6 +194,9 @@ void tsortSetClosed(SSortHandle* pHandle);
|
|||
void tsortSetSingleTableMerge(SSortHandle* pHandle);
|
||||
void tsortSetAbortCheckFn(SSortHandle* pHandle, bool (*checkFn)(void* param), void* param);
|
||||
|
||||
int32_t tsortSetSortByRowId(SSortHandle* pHandle, int32_t extRowsSize);
|
||||
|
||||
void tsortAppendTupleToBlock(SSortHandle* pHandle, SSDataBlock* pBlock, STupleHandle* pTupleHandle);
|
||||
/**
|
||||
* @brief comp the tuple with keyBuf, if not equal, new keys will be built in keyBuf, newLen will be stored in keyLen
|
||||
* @param [in] pSortCols cols to comp and build
|
||||
|
|
|
@ -269,6 +269,7 @@ SSDataBlock* doNonSortMerge(SOperatorInfo* pOperator) {
|
|||
SMultiwayMergeOperatorInfo* pInfo = pOperator->info;
|
||||
SNonSortMergeInfo* pNonSortMerge = &pInfo->nsortMergeInfo;
|
||||
SSDataBlock* pBlock = NULL;
|
||||
SSDataBlock* pRes = pInfo->binfo.pRes;
|
||||
|
||||
qDebug("start to merge no sorted rows, %s", GET_TASKID(pTaskInfo));
|
||||
|
||||
|
@ -278,13 +279,18 @@ SSDataBlock* doNonSortMerge(SOperatorInfo* pOperator) {
|
|||
if (NULL == pBlock) {
|
||||
TSWAP(pNonSortMerge->pSourceStatus[pNonSortMerge->sourceWorkIdx], pNonSortMerge->pSourceStatus[idx]);
|
||||
pNonSortMerge->sourceWorkIdx++;
|
||||
idx = NON_SORT_NEXT_SRC(pNonSortMerge, idx);
|
||||
idx = NON_SORT_NEXT_SRC(pNonSortMerge, pNonSortMerge->lastSourceIdx);
|
||||
continue;
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
return pBlock;
|
||||
if (!pBlock) {
|
||||
return NULL;
|
||||
}
|
||||
copyDataBlock(pRes, pBlock);
|
||||
|
||||
return pRes;
|
||||
}
|
||||
|
||||
void destroyNonSortMergeOperatorInfo(void* param) {
|
||||
|
@ -491,6 +497,9 @@ SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size
|
|||
}
|
||||
case MERGE_TYPE_NON_SORT: {
|
||||
SNonSortMergeInfo* pNonSortMerge = &pInfo->nsortMergeInfo;
|
||||
pInfo->binfo.pRes = createDataBlockFromDescNode(pDescNode);
|
||||
initResultSizeInfo(&pOperator->resultInfo, 1024);
|
||||
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
|
||||
break;
|
||||
}
|
||||
case MERGE_TYPE_COLUMNS: {
|
||||
|
|
|
@ -28,6 +28,7 @@ typedef struct SProjectOperatorInfo {
|
|||
bool mergeDataBlocks;
|
||||
SSDataBlock* pFinalRes;
|
||||
bool inputIgnoreGroup;
|
||||
bool outputIgnoreGroup;
|
||||
} SProjectOperatorInfo;
|
||||
|
||||
typedef struct SIndefOperatorInfo {
|
||||
|
@ -111,6 +112,7 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys
|
|||
pInfo->binfo.inputTsOrder = pProjPhyNode->node.inputTsOrder;
|
||||
pInfo->binfo.outputTsOrder = pProjPhyNode->node.outputTsOrder;
|
||||
pInfo->inputIgnoreGroup = pProjPhyNode->inputIgnoreGroup;
|
||||
pInfo->outputIgnoreGroup = pProjPhyNode->ignoreGroupId;
|
||||
|
||||
if (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM || pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE) {
|
||||
pInfo->mergeDataBlocks = false;
|
||||
|
@ -276,6 +278,10 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
|
|||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
}
|
||||
|
||||
if (pProjectInfo->outputIgnoreGroup) {
|
||||
pRes->info.id.groupId = 0;
|
||||
}
|
||||
|
||||
return (pRes->info.rows > 0) ? pRes : NULL;
|
||||
}
|
||||
|
||||
|
@ -385,6 +391,10 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
|
|||
printDataBlock(p, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
|
||||
}
|
||||
|
||||
if (pProjectInfo->outputIgnoreGroup) {
|
||||
p->info.id.groupId = 0;
|
||||
}
|
||||
|
||||
return (p->info.rows > 0) ? p : NULL;
|
||||
}
|
||||
|
||||
|
|
|
@ -2269,6 +2269,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
|
|||
|
||||
if (pStreamInfo->recoverStep == STREAM_RECOVER_STEP__SCAN1) {
|
||||
if (isTaskKilled(pTaskInfo)) {
|
||||
qInfo("===stream===stream scan is killed. task id:%s, code %s", id, tstrerror(pTaskInfo->code));
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
@ -3870,6 +3871,7 @@ static void doGetBlockForTableMergeScan(SOperatorInfo* pOperator, bool* pFinishe
|
|||
|
||||
uint32_t status = 0;
|
||||
code = loadDataBlock(pOperator, &pInfo->base, pBlock, &status);
|
||||
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qInfo("table merge scan load datablock code %d, %s", code, GET_TASKID(pTaskInfo));
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
|
@ -3956,7 +3958,9 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) {
|
|||
pBlock->info.id.groupId = tableListGetTableGroupId(pInfo->base.pTableListInfo, pBlock->info.id.uid);
|
||||
|
||||
pOperator->resultInfo.totalRows += pBlock->info.rows;
|
||||
|
||||
pInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
|
||||
|
||||
return pBlock;
|
||||
}
|
||||
|
||||
|
@ -4008,9 +4012,16 @@ int32_t startDurationForGroupTableMergeScan(SOperatorInfo* pOperator) {
|
|||
|
||||
pInfo->sortBufSize = 2048 * pInfo->bufPageSize;
|
||||
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
|
||||
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_BLOCK_TS_MERGE, pInfo->bufPageSize, numOfBufPage,
|
||||
pInfo->pSortInputBlock, pTaskInfo->id.str, 0, 0, 0);
|
||||
|
||||
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_BLOCK_TS_MERGE, pInfo->bufPageSize, numOfBufPage,
|
||||
pInfo->pSortInputBlock, pTaskInfo->id.str, 0, 0, 0);
|
||||
if (pInfo->bSortRowId && numOfTable != 1) {
|
||||
int32_t memSize = 512 * 1024 * 1024;
|
||||
code = tsortSetSortByRowId(pInfo->pSortHandle, memSize);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
}
|
||||
tsortSetMergeLimit(pInfo->pSortHandle, pInfo->mergeLimit);
|
||||
tsortSetMergeLimitReachedFp(pInfo->pSortHandle, tableMergeScanDoSkipTable, pInfo);
|
||||
tsortSetAbortCheckFn(pInfo->pSortHandle, isTaskKilled, pOperator->pTaskInfo);
|
||||
|
@ -4047,6 +4058,7 @@ void stopDurationForGroupTableMergeScan(SOperatorInfo* pOperator) {
|
|||
|
||||
tsortDestroySortHandle(pInfo->pSortHandle);
|
||||
pInfo->pSortHandle = NULL;
|
||||
|
||||
}
|
||||
|
||||
int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
|
||||
|
@ -4131,8 +4143,7 @@ SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, SSDataBlock*
|
|||
if (pTupleHandle == NULL) {
|
||||
break;
|
||||
}
|
||||
|
||||
appendOneRowToDataBlock(pResBlock, pTupleHandle);
|
||||
tsortAppendTupleToBlock(pInfo->pSortHandle, pResBlock, pTupleHandle);
|
||||
if (pResBlock->info.rows >= capacity) {
|
||||
break;
|
||||
}
|
||||
|
@ -4199,7 +4210,10 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
|
|||
} else {
|
||||
if (pInfo->bNewFilesetEvent) {
|
||||
stopDurationForGroupTableMergeScan(pOperator);
|
||||
startDurationForGroupTableMergeScan(pOperator);
|
||||
code = startDurationForGroupTableMergeScan(pOperator);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
T_LONG_JMP(pTaskInfo->env, terrno);
|
||||
}
|
||||
} else {
|
||||
// Data of this group are all dumped, let's try the next group
|
||||
stopGroupTableMergeScan(pOperator);
|
||||
|
@ -4330,10 +4344,15 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
|
|||
pInfo->mergeLimit = pInfo->limitInfo.limit.limit + pInfo->limitInfo.limit.offset;
|
||||
pInfo->mSkipTables = NULL;
|
||||
}
|
||||
|
||||
|
||||
initResultSizeInfo(&pOperator->resultInfo, 1024);
|
||||
pInfo->pResBlock = createDataBlockFromDescNode(pDescNode);
|
||||
blockDataEnsureCapacity(pInfo->pResBlock, pOperator->resultInfo.capacity);
|
||||
if (!hasLimit && blockDataGetRowSize(pInfo->pResBlock) >= 256 && !pTableScanNode->smallDataTsSort) {
|
||||
pInfo->bSortRowId = true;
|
||||
} else {
|
||||
pInfo->bSortRowId = false;
|
||||
}
|
||||
|
||||
pInfo->pSortInfo = generateSortByTsInfo(pInfo->base.matchInfo.pList, pInfo->base.cond.order);
|
||||
pInfo->pReaderBlock = createOneDataBlock(pInfo->pResBlock, false);
|
||||
|
@ -4342,6 +4361,7 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
|
|||
|
||||
int32_t rowSize = pInfo->pResBlock->info.rowSize;
|
||||
uint32_t nCols = taosArrayGetSize(pInfo->pResBlock->pDataBlock);
|
||||
|
||||
pInfo->bufPageSize = getProperSortPageSize(rowSize, nCols);
|
||||
|
||||
//start one reader variable
|
||||
|
|
|
@ -1281,8 +1281,8 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
|||
tSimpleHashCleanup(pInfo->pUpdatedMap);
|
||||
pInfo->pUpdatedMap = NULL;
|
||||
}
|
||||
qInfo("%s task is killed, code %s", GET_TASKID(pTaskInfo), tstrerror(pTaskInfo->code));
|
||||
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
|
||||
qInfo("===stream=== %s task is killed, code %s", GET_TASKID(pTaskInfo), tstrerror(pTaskInfo->code));
|
||||
return NULL;
|
||||
}
|
||||
|
||||
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
|
||||
|
@ -4332,7 +4332,8 @@ static SSDataBlock* doStreamMidIntervalAgg(SOperatorInfo* pOperator) {
|
|||
pInfo->pUpdatedMap = NULL;
|
||||
}
|
||||
|
||||
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
|
||||
qInfo("===stream=== %s task is killed, code %s", GET_TASKID(pTaskInfo), tstrerror(pTaskInfo->code));
|
||||
return NULL;
|
||||
}
|
||||
|
||||
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
|
||||
|
|
|
@ -32,6 +32,32 @@ struct STupleHandle {
|
|||
int32_t rowIndex;
|
||||
};
|
||||
|
||||
typedef struct SSortMemFileRegion {
|
||||
int64_t fileOffset;
|
||||
int32_t regionSize;
|
||||
|
||||
int32_t bufRegOffset;
|
||||
int32_t bufLen;
|
||||
char* buf;
|
||||
} SSortMemFileRegion;
|
||||
|
||||
typedef struct SSortMemFile {
|
||||
char* writeBuf;
|
||||
int32_t writeBufSize;
|
||||
int64_t writeFileOffset;
|
||||
|
||||
int32_t currRegionId;
|
||||
int32_t currRegionOffset;
|
||||
bool bRegionDirty;
|
||||
|
||||
SArray* aFileRegions;
|
||||
int32_t cacheSize;
|
||||
int32_t blockSize;
|
||||
|
||||
FILE* pTdFile;
|
||||
char memFilePath[PATH_MAX];
|
||||
} SSortMemFile;
|
||||
|
||||
struct SSortHandle {
|
||||
int32_t type;
|
||||
int32_t pageSize;
|
||||
|
@ -76,10 +102,21 @@ struct SSortHandle {
|
|||
bool (*abortCheckFn)(void* param);
|
||||
void* abortCheckParam;
|
||||
|
||||
bool bSortByRowId;
|
||||
SSortMemFile* pExtRowsMemFile;
|
||||
int32_t extRowBytes;
|
||||
int32_t extRowsPageSize;
|
||||
int32_t extRowsMemSize;
|
||||
int32_t srcTsSlotId;
|
||||
SBlockOrderInfo extRowsOrderInfo;
|
||||
|
||||
void (*mergeLimitReachedFn)(uint64_t tableUid, void* param);
|
||||
void* mergeLimitReachedParam;
|
||||
};
|
||||
|
||||
static int32_t destroySortMemFile(SSortHandle* pHandle);
|
||||
static int32_t getRowBufFromExtMemFile(SSortHandle* pHandle, int32_t regionId, int32_t tupleOffset, int32_t rowLen,
|
||||
char** ppRow, bool* pFreeRow);
|
||||
void tsortSetSingleTableMerge(SSortHandle* pHandle) {
|
||||
pHandle->singleTableMerge = true;
|
||||
}
|
||||
|
@ -189,6 +226,7 @@ void destroyTuple(void* t) {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
*
|
||||
* @param type
|
||||
|
@ -202,7 +240,7 @@ SSortHandle* tsortCreateSortHandle(SArray* pSortInfo, int32_t type, int32_t page
|
|||
pSortHandle->type = type;
|
||||
pSortHandle->pageSize = pageSize;
|
||||
pSortHandle->numOfPages = numOfPages;
|
||||
pSortHandle->pSortInfo = pSortInfo;
|
||||
pSortHandle->pSortInfo = taosArrayDup(pSortInfo, NULL);
|
||||
pSortHandle->loops = 0;
|
||||
|
||||
pSortHandle->pqMaxTupleLength = pqMaxTupleLength;
|
||||
|
@ -305,6 +343,10 @@ void tsortDestroySortHandle(SSortHandle* pSortHandle) {
|
|||
qDebug("all source fetch time: %" PRId64 "us num:%" PRId64 " %s", fetchUs, fetchNum, pSortHandle->idStr);
|
||||
|
||||
taosArrayDestroy(pSortHandle->pOrderedSource);
|
||||
if (pSortHandle->pExtRowsMemFile != NULL) {
|
||||
destroySortMemFile(pSortHandle);
|
||||
}
|
||||
taosArrayDestroy(pSortHandle->pSortInfo);
|
||||
taosMemoryFreeClear(pSortHandle);
|
||||
}
|
||||
|
||||
|
@ -851,6 +893,389 @@ static int32_t createPageBuf(SSortHandle* pHandle) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
void tsortAppendTupleToBlock(SSortHandle* pHandle, SSDataBlock* pBlock, STupleHandle* pTupleHandle) {
|
||||
if (pHandle->bSortByRowId) {
|
||||
int32_t regionId = *(int32_t*)tsortGetValue(pTupleHandle, 1);
|
||||
int32_t offset = *(int32_t*)tsortGetValue(pTupleHandle, 2);
|
||||
int32_t length = *(int32_t*)tsortGetValue(pTupleHandle, 3);
|
||||
|
||||
char* buf = NULL;
|
||||
bool bFreeRow = false;
|
||||
getRowBufFromExtMemFile(pHandle, regionId, offset, length, &buf, &bFreeRow);
|
||||
int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
|
||||
char* isNull = (char*)buf;
|
||||
char* pStart = (char*)buf + sizeof(int8_t) * numOfCols;
|
||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i);
|
||||
|
||||
if (!isNull[i]) {
|
||||
colDataSetVal(pColInfo, pBlock->info.rows, pStart, false);
|
||||
if (pColInfo->info.type == TSDB_DATA_TYPE_JSON) {
|
||||
int32_t dataLen = getJsonValueLen(pStart);
|
||||
pStart += dataLen;
|
||||
} else if (IS_VAR_DATA_TYPE(pColInfo->info.type)) {
|
||||
pStart += varDataTLen(pStart);
|
||||
} else {
|
||||
int32_t bytes = pColInfo->info.bytes;
|
||||
pStart += bytes;
|
||||
}
|
||||
} else {
|
||||
colDataSetNULL(pColInfo, pBlock->info.rows);
|
||||
}
|
||||
}
|
||||
if (bFreeRow) {
|
||||
taosMemoryFree(buf);
|
||||
}
|
||||
if (*(int32_t*)pStart != pStart - buf) {
|
||||
qError("table merge scan row buf deserialization. length error %d != %d ", *(int32_t*)pStart,
|
||||
(int32_t)(pStart - buf));
|
||||
};
|
||||
|
||||
pBlock->info.dataLoad = 1;
|
||||
pBlock->info.scanFlag = ((SDataBlockInfo*)tsortGetBlockInfo(pTupleHandle))->scanFlag;
|
||||
pBlock->info.rows += 1;
|
||||
|
||||
} else {
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); ++i) {
|
||||
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i);
|
||||
bool isNull = tsortIsNullVal(pTupleHandle, i);
|
||||
if (isNull) {
|
||||
colDataSetNULL(pColInfo, pBlock->info.rows);
|
||||
} else {
|
||||
char* pData = tsortGetValue(pTupleHandle, i);
|
||||
if (pData != NULL) {
|
||||
colDataSetVal(pColInfo, pBlock->info.rows, pData, false);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pBlock->info.dataLoad = 1;
|
||||
pBlock->info.scanFlag = ((SDataBlockInfo*)tsortGetBlockInfo(pTupleHandle))->scanFlag;
|
||||
pBlock->info.rows += 1;
|
||||
}
|
||||
}
|
||||
|
||||
static int32_t blockRowToBuf(SSDataBlock* pBlock, int32_t rowIdx, char* buf) {
|
||||
size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
|
||||
|
||||
char* isNull = (char*)buf;
|
||||
char* pStart = (char*)buf + sizeof(int8_t) * numOfCols;
|
||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i);
|
||||
if (colDataIsNull_s(pCol, rowIdx)) {
|
||||
isNull[i] = 1;
|
||||
continue;
|
||||
}
|
||||
|
||||
isNull[i] = 0;
|
||||
char* pData = colDataGetData(pCol, rowIdx);
|
||||
if (pCol->info.type == TSDB_DATA_TYPE_JSON) {
|
||||
if (pCol->pData) {
|
||||
int32_t dataLen = getJsonValueLen(pData);
|
||||
memcpy(pStart, pData, dataLen);
|
||||
pStart += dataLen;
|
||||
} else {
|
||||
// the column that is pre-allocated has no data and has offset
|
||||
*pStart = 0;
|
||||
pStart += 1;
|
||||
}
|
||||
} else if (IS_VAR_DATA_TYPE(pCol->info.type)) {
|
||||
if (pCol->pData) {
|
||||
varDataCopy(pStart, pData);
|
||||
pStart += varDataTLen(pData);
|
||||
} else {
|
||||
// the column that is pre-allocated has no data and has offset
|
||||
*(VarDataLenT*)(pStart) = 0;
|
||||
pStart += VARSTR_HEADER_SIZE;
|
||||
}
|
||||
} else {
|
||||
int32_t bytes = pCol->info.bytes;
|
||||
memcpy(pStart, pData, bytes);
|
||||
pStart += bytes;
|
||||
}
|
||||
}
|
||||
*(int32_t*)pStart = (char*)pStart - (char*)buf;
|
||||
pStart += sizeof(int32_t);
|
||||
return (int32_t)(pStart - (char*)buf);
|
||||
}
|
||||
|
||||
static int32_t getRowBufFromExtMemFile(SSortHandle* pHandle, int32_t regionId, int32_t tupleOffset, int32_t rowLen,
|
||||
char** ppRow, bool* pFreeRow) {
|
||||
SSortMemFile* pMemFile = pHandle->pExtRowsMemFile;
|
||||
SSortMemFileRegion* pRegion = taosArrayGet(pMemFile->aFileRegions, regionId);
|
||||
if (pRegion->buf == NULL) {
|
||||
pRegion->bufRegOffset = 0;
|
||||
pRegion->buf = taosMemoryMalloc(pMemFile->blockSize);
|
||||
if (pRegion->buf == NULL) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
taosSeekCFile(pMemFile->pTdFile, pRegion->fileOffset, SEEK_SET);
|
||||
int32_t readBytes = TMIN(pMemFile->blockSize, pRegion->regionSize);
|
||||
int ret = taosReadFromCFile(pRegion->buf, readBytes, 1, pMemFile->pTdFile);
|
||||
if (ret != 1) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return terrno;
|
||||
}
|
||||
pRegion->bufLen = readBytes;
|
||||
}
|
||||
ASSERT(pRegion->bufRegOffset <= tupleOffset);
|
||||
if (pRegion->bufRegOffset + pRegion->bufLen >= tupleOffset + rowLen) {
|
||||
*pFreeRow = false;
|
||||
*ppRow = pRegion->buf + tupleOffset - pRegion->bufRegOffset;
|
||||
} else {
|
||||
*ppRow = taosMemoryMalloc(rowLen);
|
||||
if (*ppRow == NULL) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
int32_t szThisBlock = pRegion->bufLen - (tupleOffset - pRegion->bufRegOffset);
|
||||
memcpy(*ppRow, pRegion->buf + tupleOffset - pRegion->bufRegOffset, szThisBlock);
|
||||
taosSeekCFile(pMemFile->pTdFile, pRegion->fileOffset + pRegion->bufRegOffset + pRegion->bufLen, SEEK_SET);
|
||||
int32_t readBytes = TMIN(pMemFile->blockSize, pRegion->regionSize - (pRegion->bufRegOffset + pRegion->bufLen));
|
||||
int ret = taosReadFromCFile(pRegion->buf, readBytes, 1, pMemFile->pTdFile);
|
||||
if (ret != 1) {
|
||||
taosMemoryFreeClear(*ppRow);
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return terrno;
|
||||
}
|
||||
memcpy(*ppRow + szThisBlock, pRegion->buf, rowLen - szThisBlock);
|
||||
*pFreeRow = true;
|
||||
pRegion->bufRegOffset += pRegion->bufLen;
|
||||
pRegion->bufLen = readBytes;
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t createSortMemFile(SSortHandle* pHandle) {
|
||||
if (pHandle->pExtRowsMemFile != NULL) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SSortMemFile* pMemFile = taosMemoryCalloc(1, sizeof(SSortMemFile));
|
||||
if (pMemFile == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
taosGetTmpfilePath(tsTempDir, "sort-ext-mem", pMemFile->memFilePath);
|
||||
pMemFile->pTdFile = taosOpenCFile(pMemFile->memFilePath, "w+");
|
||||
if (pMemFile->pTdFile == NULL) {
|
||||
code = terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
}
|
||||
}
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
taosSetAutoDelFile(pMemFile->memFilePath);
|
||||
|
||||
pMemFile->currRegionId = -1;
|
||||
pMemFile->currRegionOffset = -1;
|
||||
|
||||
pMemFile->writeBufSize = 4 * 1024 * 1024;
|
||||
pMemFile->writeFileOffset = -1;
|
||||
pMemFile->bRegionDirty = false;
|
||||
|
||||
pMemFile->writeBuf = taosMemoryMalloc(pMemFile->writeBufSize);
|
||||
if (pMemFile->writeBuf == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
}
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
pMemFile->cacheSize = pHandle->extRowsMemSize;
|
||||
pMemFile->aFileRegions = taosArrayInit(64, sizeof(SSortMemFileRegion));
|
||||
if (pMemFile->aFileRegions == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
}
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
pHandle->pExtRowsMemFile = pMemFile;
|
||||
} else {
|
||||
if (pMemFile) {
|
||||
if (pMemFile->aFileRegions) taosMemoryFreeClear(pMemFile->aFileRegions);
|
||||
if (pMemFile->writeBuf) taosMemoryFreeClear(pMemFile->writeBuf);
|
||||
if (pMemFile->pTdFile) {
|
||||
taosCloseCFile(pMemFile->pTdFile);
|
||||
pMemFile->pTdFile = NULL;
|
||||
}
|
||||
taosMemoryFreeClear(pMemFile);
|
||||
}
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t destroySortMemFile(SSortHandle* pHandle) {
|
||||
if (pHandle->pExtRowsMemFile == NULL) return TSDB_CODE_SUCCESS;
|
||||
|
||||
SSortMemFile* pMemFile = pHandle->pExtRowsMemFile;
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pMemFile->aFileRegions); ++i) {
|
||||
SSortMemFileRegion* pRegion = taosArrayGet(pMemFile->aFileRegions, i);
|
||||
taosMemoryFree(pRegion->buf);
|
||||
}
|
||||
taosArrayDestroy(pMemFile->aFileRegions);
|
||||
pMemFile->aFileRegions = NULL;
|
||||
|
||||
taosMemoryFree(pMemFile->writeBuf);
|
||||
pMemFile->writeBuf = NULL;
|
||||
|
||||
taosCloseCFile(pMemFile->pTdFile);
|
||||
pMemFile->pTdFile = NULL;
|
||||
taosRemoveFile(pMemFile->memFilePath);
|
||||
taosMemoryFree(pMemFile);
|
||||
pHandle->pExtRowsMemFile = NULL;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t tsortOpenRegion(SSortHandle* pHandle) {
|
||||
SSortMemFile* pMemFile = pHandle->pExtRowsMemFile;
|
||||
if (pMemFile->currRegionId == -1) {
|
||||
SSortMemFileRegion region = {0};
|
||||
region.fileOffset = 0;
|
||||
region.bufRegOffset = 0;
|
||||
taosArrayPush(pMemFile->aFileRegions, ®ion);
|
||||
pMemFile->currRegionId = 0;
|
||||
pMemFile->currRegionOffset = 0;
|
||||
pMemFile->writeFileOffset = 0;
|
||||
} else {
|
||||
SSortMemFileRegion regionNew = {0};
|
||||
SSortMemFileRegion* pRegion = taosArrayGet(pMemFile->aFileRegions, pMemFile->currRegionId);
|
||||
regionNew.fileOffset = pRegion->fileOffset + pRegion->regionSize;
|
||||
regionNew.bufRegOffset = 0;
|
||||
taosArrayPush(pMemFile->aFileRegions, ®ionNew);
|
||||
++pMemFile->currRegionId;
|
||||
pMemFile->currRegionOffset = 0;
|
||||
pMemFile->writeFileOffset = regionNew.fileOffset;
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t tsortCloseRegion(SSortHandle* pHandle) {
|
||||
SSortMemFile* pMemFile = pHandle->pExtRowsMemFile;
|
||||
SSortMemFileRegion* pRegion = taosArrayGet(pMemFile->aFileRegions, pMemFile->currRegionId);
|
||||
pRegion->regionSize = pMemFile->currRegionOffset;
|
||||
int32_t writeBytes = pRegion->regionSize - (pMemFile->writeFileOffset - pRegion->fileOffset);
|
||||
if (writeBytes > 0) {
|
||||
int ret = fwrite(pMemFile->writeBuf, writeBytes, 1, pMemFile->pTdFile);
|
||||
if (ret != 1) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return terrno;
|
||||
}
|
||||
pMemFile->bRegionDirty = false;
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t tsortFinalizeRegions(SSortHandle* pHandle) {
|
||||
SSortMemFile* pMemFile = pHandle->pExtRowsMemFile;
|
||||
size_t numRegions = taosArrayGetSize(pMemFile->aFileRegions);
|
||||
ASSERT(numRegions == (pMemFile->currRegionId + 1));
|
||||
if (numRegions == 0) return TSDB_CODE_SUCCESS;
|
||||
int32_t blockReadBytes = (pMemFile->cacheSize / numRegions + 4095) & ~4095;
|
||||
pMemFile->blockSize = blockReadBytes;
|
||||
|
||||
for (int32_t i = 0; i < numRegions; ++i) {
|
||||
SSortMemFileRegion* pRegion = taosArrayGet(pMemFile->aFileRegions, i);
|
||||
pRegion->bufRegOffset = 0;
|
||||
}
|
||||
taosMemoryFree(pMemFile->writeBuf);
|
||||
pMemFile->writeBuf = NULL;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t saveBlockRowToExtRowsMemFile(SSortHandle* pHandle, SSDataBlock* pBlock, int32_t rowIdx, int32_t* pRegionId, int32_t* pOffset, int32_t* pLength) {
|
||||
SSortMemFile* pMemFile = pHandle->pExtRowsMemFile;
|
||||
SSortMemFileRegion* pRegion = taosArrayGet(pMemFile->aFileRegions, pMemFile->currRegionId);
|
||||
{
|
||||
if (pMemFile->currRegionOffset + pHandle->extRowBytes >= pMemFile->writeBufSize) {
|
||||
int32_t writeBytes = pMemFile->currRegionOffset - (pMemFile->writeFileOffset - pRegion->fileOffset);
|
||||
int ret = fwrite(pMemFile->writeBuf, writeBytes, 1, pMemFile->pTdFile);
|
||||
if (ret != 1) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return terrno;
|
||||
}
|
||||
pMemFile->writeFileOffset = pRegion->fileOffset + pMemFile->currRegionOffset;
|
||||
}
|
||||
}
|
||||
*pRegionId = pMemFile->currRegionId;
|
||||
*pOffset = pMemFile->currRegionOffset;
|
||||
int32_t writeBufOffset = pMemFile->currRegionOffset - (pMemFile->writeFileOffset - pRegion->fileOffset);
|
||||
int32_t blockLen = blockRowToBuf(pBlock, rowIdx, pMemFile->writeBuf + writeBufOffset);
|
||||
*pLength = blockLen;
|
||||
pMemFile->currRegionOffset += blockLen;
|
||||
pMemFile->bRegionDirty = true;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static void appendToRowIndexDataBlock(SSortHandle* pHandle, SSDataBlock* pSource, int32_t* rowIndex) {
|
||||
int32_t pageId = -1;
|
||||
int32_t offset = -1;
|
||||
int32_t length = -1;
|
||||
saveBlockRowToExtRowsMemFile(pHandle, pSource, *rowIndex, &pageId, &offset, &length);
|
||||
|
||||
SSDataBlock* pBlock = pHandle->pDataBlock;
|
||||
SColumnInfoData* pSrcTsCol = taosArrayGet(pSource->pDataBlock, pHandle->extRowsOrderInfo.slotId);
|
||||
SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, 0);
|
||||
char* pData = colDataGetData(pSrcTsCol, *rowIndex);
|
||||
colDataSetVal(pTsCol, pBlock->info.rows, pData, false);
|
||||
|
||||
SColumnInfoData* pRegionIdCol = taosArrayGet(pBlock->pDataBlock, 1);
|
||||
colDataSetInt32(pRegionIdCol, pBlock->info.rows, &pageId);
|
||||
|
||||
SColumnInfoData* pOffsetCol = taosArrayGet(pBlock->pDataBlock, 2);
|
||||
colDataSetInt32(pOffsetCol, pBlock->info.rows, &offset);
|
||||
|
||||
SColumnInfoData* pLengthCol = taosArrayGet(pBlock->pDataBlock, 3);
|
||||
colDataSetInt32(pLengthCol, pBlock->info.rows, &length);
|
||||
|
||||
pBlock->info.rows += 1;
|
||||
*rowIndex += 1;
|
||||
}
|
||||
|
||||
static void initRowIdSort(SSortHandle* pHandle) {
|
||||
|
||||
SSDataBlock* pSortInput = createDataBlock();
|
||||
SColumnInfoData tsCol = createColumnInfoData(TSDB_DATA_TYPE_TIMESTAMP, 8, 1);
|
||||
blockDataAppendColInfo(pSortInput, &tsCol);
|
||||
SColumnInfoData regionIdCol = createColumnInfoData(TSDB_DATA_TYPE_INT, 4, 2);
|
||||
blockDataAppendColInfo(pSortInput, ®ionIdCol);
|
||||
SColumnInfoData offsetCol = createColumnInfoData(TSDB_DATA_TYPE_INT, 4, 3);
|
||||
blockDataAppendColInfo(pSortInput, &offsetCol);
|
||||
SColumnInfoData lengthCol = createColumnInfoData(TSDB_DATA_TYPE_INT, 4, 4);
|
||||
blockDataAppendColInfo(pSortInput, &lengthCol);
|
||||
|
||||
blockDataDestroy(pHandle->pDataBlock);
|
||||
pHandle->pDataBlock = pSortInput;
|
||||
|
||||
int32_t rowSize = blockDataGetRowSize(pHandle->pDataBlock);
|
||||
size_t nCols = taosArrayGetSize(pHandle->pDataBlock->pDataBlock);
|
||||
pHandle->pageSize = 256 * 1024; // 256k
|
||||
pHandle->numOfPages = 256;
|
||||
|
||||
SBlockOrderInfo* pOrder = taosArrayGet(pHandle->pSortInfo, 0);
|
||||
SBlockOrderInfo bi = {0};
|
||||
bi.order = pOrder->order;
|
||||
bi.slotId = 0;
|
||||
bi.nullFirst = NULL_ORDER_FIRST;
|
||||
|
||||
SArray* aOrder = taosArrayInit(1, sizeof(SBlockOrderInfo));
|
||||
taosArrayPush(aOrder, &bi);
|
||||
|
||||
taosArrayDestroy(pHandle->pSortInfo);
|
||||
pHandle->pSortInfo = aOrder;
|
||||
return;
|
||||
}
|
||||
|
||||
int32_t tsortSetSortByRowId(SSortHandle* pHandle, int32_t extRowsMemSize) {
|
||||
pHandle->extRowBytes = blockDataGetRowSize(pHandle->pDataBlock) + taosArrayGetSize(pHandle->pDataBlock->pDataBlock) + sizeof(int32_t);
|
||||
pHandle->extRowsMemSize = extRowsMemSize;
|
||||
SBlockOrderInfo* pOrder = taosArrayGet(pHandle->pSortInfo, 0);
|
||||
pHandle->extRowsOrderInfo = *pOrder;
|
||||
initRowIdSort(pHandle);
|
||||
if (!osTempSpaceAvailable()) {
|
||||
terrno = TSDB_CODE_NO_DISKSPACE;
|
||||
qError("create sort mem file failed since %s, tempDir:%s", terrstr(), tsTempDir);
|
||||
return terrno;
|
||||
}
|
||||
int32_t code = createSortMemFile(pHandle);
|
||||
pHandle->bSortByRowId = true;
|
||||
return code;
|
||||
}
|
||||
|
||||
typedef struct SBlkMergeSupport {
|
||||
int64_t** aTs;
|
||||
int32_t* aRowIdx;
|
||||
|
@ -925,7 +1350,7 @@ static int32_t getPageBufIncForRow(SSDataBlock* blk, int32_t row, int32_t rowIdx
|
|||
return sz;
|
||||
}
|
||||
|
||||
static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SBlockOrderInfo* order, SArray* aExtSrc) {
|
||||
static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray* aExtSrc) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int pgHeaderSz = sizeof(int32_t) + sizeof(int32_t) * taosArrayGetSize(pHandle->pDataBlock->pDataBlock);
|
||||
int32_t rowCap = blockDataGetCapacityInRow(pHandle->pDataBlock, pHandle->pageSize, pgHeaderSz);
|
||||
|
@ -933,13 +1358,15 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SBlockO
|
|||
blockDataCleanup(pHandle->pDataBlock);
|
||||
int32_t numBlks = taosArrayGetSize(aBlk);
|
||||
|
||||
SBlockOrderInfo* pOrigBlockOrder = (!pHandle->bSortByRowId) ? taosArrayGet(pHandle->pSortInfo, 0) : &pHandle->extRowsOrderInfo;
|
||||
SBlockOrderInfo* pHandleBlockOrder = taosArrayGet(pHandle->pSortInfo, 0);
|
||||
SBlkMergeSupport sup;
|
||||
sup.aRowIdx = taosMemoryCalloc(numBlks, sizeof(int32_t));
|
||||
sup.aTs = taosMemoryCalloc(numBlks, sizeof(int64_t*));
|
||||
sup.order = order->order;
|
||||
sup.order = pOrigBlockOrder->order;
|
||||
for (int i = 0; i < numBlks; ++i) {
|
||||
SSDataBlock* blk = taosArrayGetP(aBlk, i);
|
||||
SColumnInfoData* col = taosArrayGet(blk->pDataBlock, order->slotId);
|
||||
SColumnInfoData* col = taosArrayGet(blk->pDataBlock, pOrigBlockOrder->slotId);
|
||||
sup.aTs[i] = (int64_t*)col->pData;
|
||||
sup.aRowIdx[i] = 0;
|
||||
}
|
||||
|
@ -963,16 +1390,17 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SBlockO
|
|||
int32_t nMergedRows = 0;
|
||||
bool mergeLimitReached = false;
|
||||
size_t blkPgSz = pgHeaderSz;
|
||||
int64_t lastPageBufTs = (order->order == TSDB_ORDER_ASC) ? INT64_MAX : INT64_MIN;
|
||||
int64_t currTs = (order->order == TSDB_ORDER_ASC) ? INT64_MAX : INT64_MIN;
|
||||
int64_t lastPageBufTs = (pHandleBlockOrder->order == TSDB_ORDER_ASC) ? INT64_MAX : INT64_MIN;
|
||||
int64_t currTs = (pHandleBlockOrder->order == TSDB_ORDER_ASC) ? INT64_MAX : INT64_MIN;
|
||||
while (nRows < totalRows) {
|
||||
int32_t minIdx = tMergeTreeGetChosenIndex(pTree);
|
||||
SSDataBlock* minBlk = taosArrayGetP(aBlk, minIdx);
|
||||
int32_t minRow = sup.aRowIdx[minIdx];
|
||||
int32_t bufInc = getPageBufIncForRow(minBlk, minRow, pHandle->pDataBlock->info.rows);
|
||||
SSDataBlock* incBlock = (pHandle->bSortByRowId) ? pHandle->pDataBlock : minBlk;
|
||||
int32_t bufInc = getPageBufIncForRow(incBlock, minRow, pHandle->pDataBlock->info.rows);
|
||||
|
||||
if (blkPgSz <= pHandle->pageSize && blkPgSz + bufInc > pHandle->pageSize) {
|
||||
SColumnInfoData* tsCol = taosArrayGet(pHandle->pDataBlock->pDataBlock, order->slotId);
|
||||
SColumnInfoData* tsCol = taosArrayGet(pHandle->pDataBlock->pDataBlock, pHandleBlockOrder->slotId);
|
||||
lastPageBufTs = ((int64_t*)tsCol->pData)[pHandle->pDataBlock->info.rows - 1];
|
||||
code = appendDataBlockToPageBuf(pHandle, pHandle->pDataBlock, aPgId);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -985,19 +1413,24 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SBlockO
|
|||
nMergedRows += pHandle->pDataBlock->info.rows;
|
||||
blockDataCleanup(pHandle->pDataBlock);
|
||||
blkPgSz = pgHeaderSz;
|
||||
bufInc = getPageBufIncForRow(minBlk, minRow, 0);
|
||||
incBlock = (pHandle->bSortByRowId) ? pHandle->pDataBlock : minBlk;
|
||||
bufInc = getPageBufIncForRow(incBlock, minRow, 0);
|
||||
|
||||
if ((pHandle->mergeLimit != -1) && (nMergedRows >= pHandle->mergeLimit)) {
|
||||
mergeLimitReached = true;
|
||||
if ((lastPageBufTs < pHandle->currMergeLimitTs && order->order == TSDB_ORDER_ASC) ||
|
||||
(lastPageBufTs > pHandle->currMergeLimitTs && order->order == TSDB_ORDER_DESC)) {
|
||||
if ((lastPageBufTs < pHandle->currMergeLimitTs && pHandleBlockOrder->order == TSDB_ORDER_ASC) ||
|
||||
(lastPageBufTs > pHandle->currMergeLimitTs && pHandleBlockOrder->order == TSDB_ORDER_DESC)) {
|
||||
pHandle->currMergeLimitTs = lastPageBufTs;
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
blockDataEnsureCapacity(pHandle->pDataBlock, pHandle->pDataBlock->info.rows + 1);
|
||||
appendOneRowToDataBlock(pHandle->pDataBlock, minBlk, &minRow);
|
||||
if (!pHandle->bSortByRowId) {
|
||||
appendOneRowToDataBlock(pHandle->pDataBlock, minBlk, &minRow);
|
||||
} else {
|
||||
appendToRowIndexDataBlock(pHandle, minBlk, &minRow);
|
||||
}
|
||||
blkPgSz += bufInc;
|
||||
|
||||
++nRows;
|
||||
|
@ -1011,7 +1444,7 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SBlockO
|
|||
}
|
||||
if (pHandle->pDataBlock->info.rows > 0) {
|
||||
if (!mergeLimitReached) {
|
||||
SColumnInfoData* tsCol = taosArrayGet(pHandle->pDataBlock->pDataBlock, order->slotId);
|
||||
SColumnInfoData* tsCol = taosArrayGet(pHandle->pDataBlock->pDataBlock, pHandleBlockOrder->slotId);
|
||||
lastPageBufTs = ((int64_t*)tsCol->pData)[pHandle->pDataBlock->info.rows - 1];
|
||||
code = appendDataBlockToPageBuf(pHandle, pHandle->pDataBlock, aPgId);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -1024,14 +1457,15 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SBlockO
|
|||
nMergedRows += pHandle->pDataBlock->info.rows;
|
||||
if ((pHandle->mergeLimit != -1) && (nMergedRows >= pHandle->mergeLimit)) {
|
||||
mergeLimitReached = true;
|
||||
if ((lastPageBufTs < pHandle->currMergeLimitTs && order->order == TSDB_ORDER_ASC) ||
|
||||
(lastPageBufTs > pHandle->currMergeLimitTs && order->order == TSDB_ORDER_DESC)) {
|
||||
if ((lastPageBufTs < pHandle->currMergeLimitTs && pHandleBlockOrder->order == TSDB_ORDER_ASC) ||
|
||||
(lastPageBufTs > pHandle->currMergeLimitTs && pHandleBlockOrder->order == TSDB_ORDER_DESC)) {
|
||||
pHandle->currMergeLimitTs = lastPageBufTs;
|
||||
}
|
||||
}
|
||||
}
|
||||
blockDataCleanup(pHandle->pDataBlock);
|
||||
}
|
||||
|
||||
SSDataBlock* pMemSrcBlk = createOneDataBlock(pHandle->pDataBlock, false);
|
||||
doAddNewExternalMemSource(pHandle->pBuf, aExtSrc, pMemSrcBlk, &pHandle->sourceId, aPgId);
|
||||
|
||||
|
@ -1083,11 +1517,10 @@ static SSDataBlock* getRowsBlockWithinMergeLimit(const SSortHandle* pHandle, SSH
|
|||
}
|
||||
|
||||
static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
|
||||
SBlockOrderInfo* pOrder = taosArrayGet(pHandle->pSortInfo, 0);
|
||||
size_t nSrc = taosArrayGetSize(pHandle->pOrderedSource);
|
||||
SArray* aExtSrc = taosArrayInit(nSrc, POINTER_BYTES);
|
||||
|
||||
size_t maxBufSize = pHandle->numOfPages * pHandle->pageSize;
|
||||
size_t maxBufSize = (pHandle->bSortByRowId) ? pHandle->extRowsMemSize : (pHandle->numOfPages * pHandle->pageSize);
|
||||
|
||||
int32_t code = createPageBuf(pHandle);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -1098,7 +1531,8 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
|
|||
SSortSource* pSrc = taosArrayGetP(pHandle->pOrderedSource, 0);
|
||||
int32_t szSort = 0;
|
||||
|
||||
if (pOrder->order == TSDB_ORDER_ASC) {
|
||||
SBlockOrderInfo* pOrigOrder = (!pHandle->bSortByRowId) ? taosArrayGet(pHandle->pSortInfo, 0) : &pHandle->extRowsOrderInfo;
|
||||
if (pOrigOrder->order == TSDB_ORDER_ASC) {
|
||||
pHandle->currMergeLimitTs = INT64_MAX;
|
||||
} else {
|
||||
pHandle->currMergeLimitTs = INT64_MIN;
|
||||
|
@ -1110,7 +1544,6 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
|
|||
while (1) {
|
||||
SSDataBlock* pBlk = pHandle->fetchfp(pSrc->param);
|
||||
|
||||
int64_t p = taosGetTimestampUs();
|
||||
bool bExtractedBlock = false;
|
||||
bool bSkipBlock = false;
|
||||
if (pBlk != NULL && pHandle->mergeLimit > 0) {
|
||||
|
@ -1121,13 +1554,13 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
|
|||
}
|
||||
|
||||
if (pBlk != NULL) {
|
||||
SColumnInfoData* tsCol = taosArrayGet(pBlk->pDataBlock, pOrder->slotId);
|
||||
SColumnInfoData* tsCol = taosArrayGet(pBlk->pDataBlock, pOrigOrder->slotId);
|
||||
int64_t firstRowTs = *(int64_t*)tsCol->pData;
|
||||
if ((pOrder->order == TSDB_ORDER_ASC && firstRowTs > pHandle->currMergeLimitTs) ||
|
||||
(pOrder->order == TSDB_ORDER_DESC && firstRowTs < pHandle->currMergeLimitTs)) {
|
||||
if ((pOrigOrder->order == TSDB_ORDER_ASC && firstRowTs > pHandle->currMergeLimitTs) ||
|
||||
(pOrigOrder->order == TSDB_ORDER_DESC && firstRowTs < pHandle->currMergeLimitTs)) {
|
||||
if (bExtractedBlock) {
|
||||
blockDataDestroy(pBlk);
|
||||
}
|
||||
}
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
@ -1150,7 +1583,13 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
|
|||
|
||||
if ((pBlk != NULL && szSort > maxBufSize) || (pBlk == NULL && szSort > 0)) {
|
||||
tSimpleHashClear(mUidBlk);
|
||||
code = sortBlocksToExtSource(pHandle, aBlkSort, pOrder, aExtSrc);
|
||||
|
||||
int64_t p = taosGetTimestampUs();
|
||||
if (pHandle->bSortByRowId) {
|
||||
tsortOpenRegion(pHandle);
|
||||
}
|
||||
code = sortBlocksToExtSource(pHandle, aBlkSort, aExtSrc);
|
||||
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
for (int i = 0; i < taosArrayGetSize(aBlkSort); ++i) {
|
||||
blockDataDestroy(taosArrayGetP(aBlkSort, i));
|
||||
|
@ -1158,7 +1597,9 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
|
|||
taosArrayClear(aBlkSort);
|
||||
break;
|
||||
}
|
||||
|
||||
if (pHandle->bSortByRowId) {
|
||||
tsortCloseRegion(pHandle);
|
||||
}
|
||||
int64_t el = taosGetTimestampUs() - p;
|
||||
pHandle->sortElapsed += el;
|
||||
|
||||
|
@ -1195,6 +1636,9 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
|
|||
}
|
||||
taosArrayDestroy(aExtSrc);
|
||||
tSimpleHashCleanup(mTableNumRows);
|
||||
if (pHandle->bSortByRowId) {
|
||||
tsortFinalizeRegions(pHandle);
|
||||
}
|
||||
pHandle->type = SORT_SINGLESOURCE_SORT;
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -457,6 +457,7 @@ static int32_t logicScanCopy(const SScanLogicNode* pSrc, SScanLogicNode* pDst) {
|
|||
COPY_SCALAR_FIELD(isCountByTag);
|
||||
CLONE_OBJECT_FIELD(pFuncTypes, functParamClone);
|
||||
COPY_SCALAR_FIELD(paraTablesSort);
|
||||
COPY_SCALAR_FIELD(smallDataTsSort);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -690,6 +691,7 @@ static int32_t physiTableScanCopy(const STableScanPhysiNode* pSrc, STableScanPhy
|
|||
COPY_SCALAR_FIELD(filesetDelimited);
|
||||
COPY_SCALAR_FIELD(needCountEmptyTable);
|
||||
COPY_SCALAR_FIELD(paraTablesSort);
|
||||
COPY_SCALAR_FIELD(smallDataTsSort);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
|
|
@ -699,6 +699,7 @@ static const char* jkScanLogicPlanGroupTags = "GroupTags";
|
|||
static const char* jkScanLogicPlanOnlyMetaCtbIdx = "OnlyMetaCtbIdx";
|
||||
static const char* jkScanLogicPlanFilesetDelimited = "FilesetDelimited";
|
||||
static const char* jkScanLogicPlanParaTablesSort = "ParaTablesSort";
|
||||
static const char* jkScanLogicPlanSmallDataTsSort = "SmallDataTsSort";
|
||||
|
||||
static int32_t logicScanNodeToJson(const void* pObj, SJson* pJson) {
|
||||
const SScanLogicNode* pNode = (const SScanLogicNode*)pObj;
|
||||
|
@ -749,6 +750,9 @@ static int32_t logicScanNodeToJson(const void* pObj, SJson* pJson) {
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddBoolToObject(pJson, jkScanLogicPlanParaTablesSort, pNode->paraTablesSort);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddBoolToObject(pJson, jkScanLogicPlanSmallDataTsSort, pNode->paraTablesSort);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -800,7 +804,10 @@ static int32_t jsonToLogicScanNode(const SJson* pJson, void* pObj) {
|
|||
code = tjsonGetBoolValue(pJson, jkScanLogicPlanFilesetDelimited, &pNode->filesetDelimited);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonGetBoolValue(pJson, jkScanLogicPlanParaTablesSort, &pNode->paraTablesSort);
|
||||
code = tjsonGetBoolValue(pJson, jkScanLogicPlanParaTablesSort, &pNode->smallDataTsSort);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonGetBoolValue(pJson, jkScanLogicPlanSmallDataTsSort, &pNode->smallDataTsSort);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
@ -1896,6 +1903,7 @@ static const char* jkTableScanPhysiPlanIgnoreUpdate = "IgnoreUpdate";
|
|||
static const char* jkTableScanPhysiPlanFilesetDelimited = "FilesetDelimited";
|
||||
static const char* jkTableScanPhysiPlanNeedCountEmptyTable = "NeedCountEmptyTable";
|
||||
static const char* jkTableScanPhysiPlanParaTablesSort = "ParaTablesSort";
|
||||
static const char* jkTableScanPhysiPlanSmallDataTsSort = "SmallDataTsSort";
|
||||
|
||||
static int32_t physiTableScanNodeToJson(const void* pObj, SJson* pJson) {
|
||||
const STableScanPhysiNode* pNode = (const STableScanPhysiNode*)pObj;
|
||||
|
@ -1973,6 +1981,9 @@ static int32_t physiTableScanNodeToJson(const void* pObj, SJson* pJson) {
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddBoolToObject(pJson, jkTableScanPhysiPlanParaTablesSort, pNode->paraTablesSort);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddBoolToObject(pJson, jkTableScanPhysiPlanSmallDataTsSort, pNode->smallDataTsSort);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -2052,6 +2063,9 @@ static int32_t jsonToPhysiTableScanNode(const SJson* pJson, void* pObj) {
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonGetBoolValue(pJson, jkTableScanPhysiPlanParaTablesSort, &pNode->paraTablesSort);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonGetBoolValue(pJson, jkTableScanPhysiPlanSmallDataTsSort, &pNode->smallDataTsSort);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
|
|
|
@ -2188,6 +2188,9 @@ static int32_t physiTableScanNodeInlineToMsg(const void* pObj, STlvEncoder* pEnc
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tlvEncodeValueBool(pEncoder, pNode->paraTablesSort);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tlvEncodeValueBool(pEncoder, pNode->smallDataTsSort);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -2275,6 +2278,9 @@ static int32_t msgToPhysiTableScanNodeInline(STlvDecoder* pDecoder, void* pObj)
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tlvDecodeValueBool(pDecoder, &pNode->paraTablesSort);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tlvDecodeValueBool(pDecoder, &pNode->smallDataTsSort);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
|
|
|
@ -404,6 +404,9 @@ bool addHintNodeToList(SAstCreateContext* pCxt, SNodeList** ppHintList, EHintOpt
|
|||
case HINT_PARA_TABLES_SORT:
|
||||
if (paramNum > 0 || hasHint(*ppHintList, HINT_PARA_TABLES_SORT)) return true;
|
||||
break;
|
||||
case HINT_SMALLDATA_TS_SORT:
|
||||
if (paramNum > 0 || hasHint(*ppHintList, HINT_SMALLDATA_TS_SORT)) return true;
|
||||
break;
|
||||
default:
|
||||
return true;
|
||||
}
|
||||
|
@ -490,6 +493,14 @@ SNodeList* createHintNodeList(SAstCreateContext* pCxt, const SToken* pLiteral) {
|
|||
}
|
||||
opt = HINT_PARA_TABLES_SORT;
|
||||
break;
|
||||
case TK_SMALLDATA_TS_SORT:
|
||||
lastComma = false;
|
||||
if (0 != opt || inParamList) {
|
||||
quit = true;
|
||||
break;
|
||||
}
|
||||
opt = HINT_SMALLDATA_TS_SORT;
|
||||
break;
|
||||
case TK_NK_LP:
|
||||
lastComma = false;
|
||||
if (0 == opt || inParamList) {
|
||||
|
|
|
@ -213,6 +213,7 @@ static SKeyword keywordTable[] = {
|
|||
{"SLIDING", TK_SLIDING},
|
||||
{"SLIMIT", TK_SLIMIT},
|
||||
{"SMA", TK_SMA},
|
||||
{"SMALLDATA_TS_SORT", TK_SMALLDATA_TS_SORT},
|
||||
{"SMALLINT", TK_SMALLINT},
|
||||
{"SNODE", TK_SNODE},
|
||||
{"SNODES", TK_SNODES},
|
||||
|
|
|
@ -47,7 +47,8 @@ int32_t validateQueryPlan(SPlanContext* pCxt, SQueryPlan* pPlan);
|
|||
|
||||
bool getBatchScanOptionFromHint(SNodeList* pList);
|
||||
bool getSortForGroupOptHint(SNodeList* pList);
|
||||
bool getparaTablesSortOptHint(SNodeList* pList);
|
||||
bool getParaTablesSortOptHint(SNodeList* pList);
|
||||
bool getSmallDataTsSortOptHint(SNodeList* pList);
|
||||
bool getOptHint(SNodeList* pList, EHintOption hint);
|
||||
SLogicNode* getLogicNodeRootNode(SLogicNode* pCurr);
|
||||
int32_t collectTableAliasFromNodes(SNode* pNode, SSHashObj** ppRes);
|
||||
|
|
|
@ -502,7 +502,8 @@ static int32_t createScanLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect
|
|||
} else {
|
||||
nodesDestroyNode((SNode*)pScan);
|
||||
}
|
||||
pScan->paraTablesSort = getparaTablesSortOptHint(pSelect->pHint);
|
||||
pScan->paraTablesSort = getParaTablesSortOptHint(pSelect->pHint);
|
||||
pScan->smallDataTsSort = getSmallDataTsSortOptHint(pSelect->pHint);
|
||||
pCxt->hasScan = true;
|
||||
|
||||
return code;
|
||||
|
|
|
@ -527,7 +527,10 @@ static int32_t createSimpleScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSub
|
|||
if (NULL == pScan) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);
|
||||
|
||||
if (pScanLogicNode->pVgroupList) {
|
||||
vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);
|
||||
}
|
||||
return createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, pScan, pPhyNode);
|
||||
}
|
||||
|
||||
|
@ -538,8 +541,9 @@ static int32_t createTagScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubpla
|
|||
if (NULL == pScan) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);
|
||||
|
||||
if (pScanLogicNode->pVgroupList) {
|
||||
vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);
|
||||
}
|
||||
pScan->onlyMetaCtbIdx = pScanLogicNode->onlyMetaCtbIdx;
|
||||
|
||||
return createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, (SScanPhysiNode*)pScan, pPhyNode);
|
||||
|
@ -563,8 +567,9 @@ static int32_t createLastRowScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSu
|
|||
pScan->groupSort = pScanLogicNode->groupSort;
|
||||
pScan->ignoreNull = pScanLogicNode->igLastNull;
|
||||
|
||||
vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);
|
||||
|
||||
if (pScanLogicNode->pVgroupList) {
|
||||
vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);
|
||||
}
|
||||
int32_t code = createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, (SScanPhysiNode*)pScan, pPhyNode);
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code && pScanLogicNode->pFuncTypes != NULL) {
|
||||
|
@ -609,8 +614,9 @@ static int32_t createTableCountScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan*
|
|||
}
|
||||
|
||||
pScan->groupSort = pScanLogicNode->groupSort;
|
||||
vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);
|
||||
|
||||
if (pScanLogicNode->pVgroupList) {
|
||||
vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);
|
||||
}
|
||||
return createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, (SScanPhysiNode*)pScan, pPhyNode);
|
||||
}
|
||||
|
||||
|
@ -652,6 +658,7 @@ static int32_t createTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubp
|
|||
pTableScan->filesetDelimited = pScanLogicNode->filesetDelimited;
|
||||
pTableScan->needCountEmptyTable = pScanLogicNode->isCountByTag;
|
||||
pTableScan->paraTablesSort = pScanLogicNode->paraTablesSort;
|
||||
pTableScan->smallDataTsSort = pScanLogicNode->smallDataTsSort;
|
||||
|
||||
int32_t code = createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, (SScanPhysiNode*)pTableScan, pPhyNode);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
|
@ -680,7 +687,9 @@ static int32_t createSystemTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan*
|
|||
if (0 == strcmp(pScanLogicNode->tableName.tname, TSDB_INS_TABLE_TABLES) ||
|
||||
0 == strcmp(pScanLogicNode->tableName.tname, TSDB_INS_TABLE_TAGS) ||
|
||||
0 == strcmp(pScanLogicNode->tableName.tname, TSDB_INS_TABLE_COLS)) {
|
||||
vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);
|
||||
if (pScanLogicNode->pVgroupList) {
|
||||
vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);
|
||||
}
|
||||
} else {
|
||||
pSubplan->execNode.nodeId = MNODE_HANDLE;
|
||||
pSubplan->execNode.epSet = pCxt->pPlanCxt->mgmtEpSet;
|
||||
|
@ -2216,11 +2225,12 @@ static int32_t createQueryInserter(SPhysiPlanContext* pCxt, SVnodeModifyLogicNod
|
|||
pInserter->stableId = pModify->stableId;
|
||||
pInserter->tableType = pModify->tableType;
|
||||
strcpy(pInserter->tableName, pModify->tableName);
|
||||
pInserter->vgId = pModify->pVgroupList->vgroups[0].vgId;
|
||||
pInserter->epSet = pModify->pVgroupList->vgroups[0].epSet;
|
||||
pInserter->explain = (QUERY_NODE_EXPLAIN_STMT == nodeType(pCxt->pPlanCxt->pAstRoot) ? true : false);
|
||||
vgroupInfoToNodeAddr(pModify->pVgroupList->vgroups, &pSubplan->execNode);
|
||||
|
||||
if (pModify->pVgroupList) {
|
||||
pInserter->vgId = pModify->pVgroupList->vgroups[0].vgId;
|
||||
pInserter->epSet = pModify->pVgroupList->vgroups[0].epSet;
|
||||
vgroupInfoToNodeAddr(pModify->pVgroupList->vgroups, &pSubplan->execNode);
|
||||
}
|
||||
int32_t code = setListSlotId(pCxt, pSubplan->pNode->pOutputDataBlockDesc->dataBlockId, -1, pModify->pInsertCols,
|
||||
&pInserter->pCols);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
|
|
|
@ -1320,7 +1320,11 @@ static int32_t stbSplSplitScanNodeWithPartTags(SSplitContext* pCxt, SStableSplit
|
|||
SLogicNode* pSplitNode = NULL;
|
||||
int32_t code = stbSplGetSplitNodeForScan(pInfo, &pSplitNode);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = stbSplCreateMergeNode(pCxt, pInfo->pSubplan, pSplitNode, NULL, pSplitNode, true, true);
|
||||
bool needSort = true;
|
||||
if (QUERY_NODE_LOGIC_PLAN_PROJECT == nodeType(pSplitNode) && !pSplitNode->pLimit && !pSplitNode->pSlimit) {
|
||||
needSort = !((SProjectLogicNode*)pSplitNode)->ignoreGroupId;
|
||||
}
|
||||
code = stbSplCreateMergeNode(pCxt, pInfo->pSubplan, pSplitNode, NULL, pSplitNode, needSort, needSort);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
|
||||
|
|
|
@ -466,7 +466,7 @@ bool getOptHint(SNodeList* pList, EHintOption hint) {
|
|||
return false;
|
||||
}
|
||||
|
||||
bool getparaTablesSortOptHint(SNodeList* pList) {
|
||||
bool getParaTablesSortOptHint(SNodeList* pList) {
|
||||
if (!pList) return false;
|
||||
SNode* pNode;
|
||||
FOREACH(pNode, pList) {
|
||||
|
@ -478,6 +478,18 @@ bool getparaTablesSortOptHint(SNodeList* pList) {
|
|||
return false;
|
||||
}
|
||||
|
||||
bool getSmallDataTsSortOptHint(SNodeList* pList) {
|
||||
if (!pList) return false;
|
||||
SNode* pNode;
|
||||
FOREACH(pNode, pList) {
|
||||
SHintNode* pHint = (SHintNode*)pNode;
|
||||
if (pHint->option == HINT_SMALLDATA_TS_SORT) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
int32_t collectTableAliasFromNodes(SNode* pNode, SSHashObj** ppRes) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SLogicNode* pCurr = (SLogicNode*)pNode;
|
||||
|
|
|
@ -892,7 +892,7 @@ int32_t schCloneCallbackParam(SSchCallbackParamHeader *pSrc, SSchCallbackParamHe
|
|||
int32_t schCloneSMsgSendInfo(void *src, void **dst) {
|
||||
SMsgSendInfo *pSrc = src;
|
||||
int32_t code = 0;
|
||||
SMsgSendInfo *pDst = taosMemoryMalloc(sizeof(*pSrc));
|
||||
SMsgSendInfo *pDst = taosMemoryCalloc(1, sizeof(*pSrc));
|
||||
if (NULL == pDst) {
|
||||
qError("malloc SMsgSendInfo for rpcCtx failed, len:%d", (int32_t)sizeof(*pSrc));
|
||||
SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
|
|
|
@ -353,7 +353,8 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
|
|||
if (pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
||||
ASSERT(status == TASK_STATUS__HALT || status == TASK_STATUS__DROPPING || status == TASK_STATUS__STOP);
|
||||
} else {
|
||||
ASSERT(status == TASK_STATUS__READY || status == TASK_STATUS__DROPPING || status == TASK_STATUS__STOP);
|
||||
ASSERT(status == TASK_STATUS__READY || status == TASK_STATUS__PAUSE || status == TASK_STATUS__DROPPING ||
|
||||
status == TASK_STATUS__STOP);
|
||||
int32_t code = streamTaskHandleEvent(pStreamTask->status.pSM, TASK_EVENT_HALT);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
stError("s-task:%s halt stream task:%s failed, code:%s not transfer state to stream task", id,
|
||||
|
|
|
@ -322,7 +322,7 @@ static void cliReleaseUnfinishedMsg(SCliConn* conn) {
|
|||
if (msg != NULL && msg->ctx != NULL && msg->ctx->ahandle != (void*)0x9527) {
|
||||
if (conn->ctx.freeFunc != NULL && msg->ctx->ahandle != NULL) {
|
||||
conn->ctx.freeFunc(msg->ctx->ahandle);
|
||||
} else if (msg->ctx->ahandle != NULL && pThrd->destroyAhandleFp != NULL) {
|
||||
} else if (msg->msg.info.notFreeAhandle == 0 && msg->ctx->ahandle != NULL && pThrd->destroyAhandleFp != NULL) {
|
||||
tDebug("%s conn %p destroy unfinished ahandle %p", CONN_GET_INST_LABEL(conn), conn, msg->ctx->ahandle);
|
||||
pThrd->destroyAhandleFp(msg->ctx->ahandle);
|
||||
}
|
||||
|
|
|
@ -1404,3 +1404,35 @@ int32_t taosLinkFile(char *src, char *dst) {
|
|||
#endif
|
||||
return 0;
|
||||
}
|
||||
|
||||
FILE* taosOpenCFile(const char* filename, const char* mode) {
|
||||
return fopen(filename, mode);
|
||||
}
|
||||
|
||||
int taosSeekCFile(FILE* file, int64_t offset, int whence) {
|
||||
#ifdef WINDOWS
|
||||
return _fseeki64(file, offset, whence);
|
||||
#else
|
||||
return fseeko(file, offset, whence);
|
||||
#endif
|
||||
}
|
||||
|
||||
size_t taosReadFromCFile(void *buffer, size_t size, size_t count, FILE *stream ) {
|
||||
return fread(buffer, size, count, stream);
|
||||
}
|
||||
|
||||
size_t taosWriteToCFile(const void* ptr, size_t size, size_t nitems, FILE* stream) {
|
||||
return fwrite(ptr, size, nitems, stream);
|
||||
}
|
||||
|
||||
int taosCloseCFile(FILE *f) {
|
||||
return fclose(f);
|
||||
}
|
||||
|
||||
int taosSetAutoDelFile(char* path) {
|
||||
#ifdef WINDOWS
|
||||
return SetFileAttributes(path, FILE_ATTRIBUTE_TEMPORARY);
|
||||
#else
|
||||
return unlink(path);
|
||||
#endif
|
||||
}
|
|
@ -449,6 +449,15 @@ static int32_t cfgAddItem(SConfig *pCfg, SConfigItem *pItem, const char *name) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
int size = pCfg->array->size;
|
||||
for (int32_t i = 0; i < size; ++i) {
|
||||
SConfigItem *existItem = taosArrayGet(pCfg->array, i);
|
||||
if (existItem != NULL && strcmp(existItem->name, pItem->name) == 0) {
|
||||
taosMemoryFree(pItem->name);
|
||||
return TSDB_CODE_INVALID_CFG;
|
||||
}
|
||||
}
|
||||
|
||||
int32_t len = strlen(name);
|
||||
char lowcaseName[CFG_NAME_MAX_LEN + 1] = {0};
|
||||
strntolower(lowcaseName, name, TMIN(CFG_NAME_MAX_LEN, len));
|
||||
|
@ -457,6 +466,7 @@ static int32_t cfgAddItem(SConfig *pCfg, SConfigItem *pItem, const char *name) {
|
|||
if (pItem->dtype == CFG_DTYPE_STRING) {
|
||||
taosMemoryFree(pItem->str);
|
||||
}
|
||||
|
||||
taosMemoryFree(pItem->name);
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
|
|
|
@ -113,11 +113,11 @@ echo "firstEp ${HOSTNAME}:7100" >> $TAOS_CFG
|
|||
echo "secondEp ${HOSTNAME}:7200" >> $TAOS_CFG
|
||||
echo "fqdn ${HOSTNAME}" >> $TAOS_CFG
|
||||
echo "serverPort ${NODE}" >> $TAOS_CFG
|
||||
echo "supportVnodes 1024" >> $TAOS_CFG
|
||||
echo "supportVnodes 1024" >> $TAOS_CFG
|
||||
echo "statusInterval 1" >> $TAOS_CFG
|
||||
echo "dataDir $DATA_DIR" >> $TAOS_CFG
|
||||
echo "logDir $LOG_DIR" >> $TAOS_CFG
|
||||
echo "debugFlag 135" >> $TAOS_CFG
|
||||
echo "debugFlag 135" >> $TAOS_CFG
|
||||
echo "tmrDebugFlag 131" >> $TAOS_CFG
|
||||
echo "uDebugFlag 143" >> $TAOS_CFG
|
||||
echo "rpcDebugFlag 143" >> $TAOS_CFG
|
||||
|
@ -143,4 +143,5 @@ echo "asyncLog 0" >> $TAOS_CFG
|
|||
echo "locale en_US.UTF-8" >> $TAOS_CFG
|
||||
echo "telemetryReporting 0" >> $TAOS_CFG
|
||||
echo "querySmaOptimize 1" >> $TAOS_CFG
|
||||
echo "checkpointInterval 60" >> $TAOS_CFG
|
||||
echo " " >> $TAOS_CFG
|
||||
|
|
|
@ -15,42 +15,42 @@ fi
|
|||
|
||||
PID=`ps -ef|grep -w taosd | grep -v grep | awk '{print $2}'`
|
||||
while [ -n "$PID" ]; do
|
||||
echo kill -9 $PID
|
||||
#pkill -9 taosd
|
||||
kill -9 $PID
|
||||
echo kill -15 $PID
|
||||
#pkill -15 taosd
|
||||
kill -15 $PID
|
||||
echo "Killing taosd processes"
|
||||
if [ "$OS_TYPE" != "Darwin" ]; then
|
||||
fuser -k -n tcp 6030
|
||||
else
|
||||
lsof -nti:6030 | xargs kill -9
|
||||
lsof -nti:6030 | xargs kill -15
|
||||
fi
|
||||
PID=`ps -ef|grep -w taosd | grep -v grep | awk '{print $2}'`
|
||||
done
|
||||
|
||||
PID=`ps -ef|grep -w taos | grep -v grep | awk '{print $2}'`
|
||||
while [ -n "$PID" ]; do
|
||||
echo kill -9 $PID
|
||||
echo kill -15 $PID
|
||||
#pkill -9 taos
|
||||
kill -9 $PID
|
||||
kill -15 $PID
|
||||
echo "Killing taos processes"
|
||||
if [ "$OS_TYPE" != "Darwin" ]; then
|
||||
fuser -k -n tcp 6030
|
||||
else
|
||||
lsof -nti:6030 | xargs kill -9
|
||||
lsof -nti:6030 | xargs kill -15
|
||||
fi
|
||||
PID=`ps -ef|grep -w taos | grep -v grep | awk '{print $2}'`
|
||||
done
|
||||
|
||||
PID=`ps -ef|grep -w tmq_sim | grep -v grep | awk '{print $2}'`
|
||||
while [ -n "$PID" ]; do
|
||||
echo kill -9 $PID
|
||||
#pkill -9 tmq_sim
|
||||
kill -9 $PID
|
||||
echo kill -15 $PID
|
||||
#pkill -15 tmq_sim
|
||||
kill -15 $PID
|
||||
echo "Killing tmq_sim processes"
|
||||
if [ "$OS_TYPE" != "Darwin" ]; then
|
||||
fuser -k -n tcp 6030
|
||||
else
|
||||
lsof -nti:6030 | xargs kill -9
|
||||
lsof -nti:6030 | xargs kill -15
|
||||
fi
|
||||
PID=`ps -ef|grep -w tmq_sim | grep -v grep | awk '{print $2}'`
|
||||
done
|
||||
done
|
|
@ -226,4 +226,135 @@ print =============== clear
|
|||
# return -1
|
||||
#endi
|
||||
|
||||
print ================= step12
|
||||
|
||||
sql create database test2 vgroups 4;
|
||||
sql use test2;
|
||||
sql create stable stb (ts timestamp, c1 int) tags (t1 int);
|
||||
sql create table t1 using stb tags (1);
|
||||
sql create table t2 using stb tags (2);
|
||||
sql create table t3 using stb tags (3);
|
||||
sql create table t4 using stb tags (4);
|
||||
sql create table t5 using stb tags (4);
|
||||
sql create table t6 using stb tags (4);
|
||||
sql insert into t1 values ("2024-03-01 14:29:07.051", 11);
|
||||
sql insert into t2 values ("2024-03-01 14:29:07.051", 21);
|
||||
sql insert into t3 values ("2024-03-01 14:29:07.051", 31);
|
||||
sql insert into t4 values ("2024-03-01 14:29:07.051", 41);
|
||||
sql insert into t5 values ("2024-03-01 14:29:07.051", 51);
|
||||
sql insert into t6 values ("2024-03-01 14:29:07.051", 61);
|
||||
sql insert into t1 values ("2024-03-01 14:30:07.051", 12);
|
||||
sql insert into t2 values ("2024-03-01 14:30:07.051", 22);
|
||||
sql insert into t3 values ("2024-03-01 14:30:07.051", 32);
|
||||
sql insert into t4 values ("2024-03-01 14:30:07.051", 42);
|
||||
sql insert into t5 values ("2024-03-01 14:30:07.051", 52);
|
||||
sql insert into t6 values ("2024-03-01 14:30:07.051", 62);
|
||||
sql insert into t1 values ("2024-03-01 14:31:07.051", 13);
|
||||
sql insert into t2 values ("2024-03-01 14:31:07.051", 23);
|
||||
sql insert into t3 values ("2024-03-01 14:31:07.051", 33);
|
||||
sql insert into t4 values ("2024-03-01 14:31:07.051", 43);
|
||||
sql insert into t5 values ("2024-03-01 14:31:07.051", 53);
|
||||
sql insert into t6 values ("2024-03-01 14:31:07.051", 63);
|
||||
sql insert into t1 values ("2024-03-01 14:32:07.051", 14);
|
||||
sql insert into t2 values ("2024-03-01 14:32:07.051", 24);
|
||||
sql insert into t3 values ("2024-03-01 14:32:07.051", 34);
|
||||
sql insert into t4 values ("2024-03-01 14:32:07.051", 44);
|
||||
sql insert into t5 values ("2024-03-01 14:32:07.051", 54);
|
||||
sql insert into t6 values ("2024-03-01 14:32:07.051", 64);
|
||||
sql insert into t1 values ("2024-03-01 14:33:07.051", 15);
|
||||
sql insert into t2 values ("2024-03-01 14:33:07.051", 25);
|
||||
sql insert into t3 values ("2024-03-01 14:33:07.051", 35);
|
||||
sql insert into t4 values ("2024-03-01 14:33:07.051", 45);
|
||||
sql insert into t5 values ("2024-03-01 14:33:07.051", 55);
|
||||
sql insert into t6 values ("2024-03-01 14:33:07.051", 65);
|
||||
sql insert into t1 values ("2024-03-01 14:34:07.051", 16);
|
||||
sql insert into t2 values ("2024-03-01 14:34:07.051", 26);
|
||||
sql insert into t3 values ("2024-03-01 14:34:07.051", 36);
|
||||
sql insert into t4 values ("2024-03-01 14:34:07.051", 46);
|
||||
sql insert into t5 values ("2024-03-01 14:34:07.051", 56);
|
||||
sql insert into t6 values ("2024-03-01 14:34:07.051", 66);
|
||||
|
||||
sleep 300
|
||||
|
||||
sql select _wstart, count(*) from (select * from stb partition by tbname) interval(2s);
|
||||
|
||||
print $data00,$data01
|
||||
print $data10,$data11
|
||||
print $data20,$data21
|
||||
print $data30,$data31
|
||||
print $data40,$data41
|
||||
print $data50,$data51
|
||||
print $data60,$data61
|
||||
print $data70,$data71
|
||||
|
||||
if $rows != 6 then
|
||||
print $rows
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data01 != 6 then
|
||||
print $data01
|
||||
endi
|
||||
|
||||
if $data11 != 6 then
|
||||
print $data11
|
||||
endi
|
||||
|
||||
if $data21 != 6 then
|
||||
print $data21
|
||||
endi
|
||||
|
||||
if $data31 != 6 then
|
||||
print $data31
|
||||
endi
|
||||
|
||||
if $data41 != 6 then
|
||||
print $data41
|
||||
endi
|
||||
|
||||
if $data51 != 6 then
|
||||
print $data51
|
||||
endi
|
||||
|
||||
|
||||
sql select _wstart, count(*) from (select * from stb partition by tbname slimit 2) interval(2s);
|
||||
|
||||
print $data00,$data01
|
||||
print $data10,$data11
|
||||
print $data20,$data21
|
||||
print $data30,$data31
|
||||
print $data40,$data41
|
||||
print $data50,$data51
|
||||
print $data60,$data61
|
||||
print $data70,$data71
|
||||
|
||||
if $rows != 6 then
|
||||
print $rows then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data01 != 2 then
|
||||
print $data01
|
||||
endi
|
||||
|
||||
if $data11 != 2 then
|
||||
print $data11
|
||||
endi
|
||||
|
||||
if $data21 != 2 then
|
||||
print $data21
|
||||
endi
|
||||
|
||||
if $data31 != 2 then
|
||||
print $data31
|
||||
endi
|
||||
|
||||
if $data41 != 2 then
|
||||
print $data41
|
||||
endi
|
||||
|
||||
if $data51 != 2 then
|
||||
print $data51
|
||||
endi
|
||||
|
||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||
|
|
|
@ -8,16 +8,19 @@ sql connect
|
|||
print ===== step1
|
||||
sql drop stream if exists streams1;
|
||||
sql drop database if exists test;
|
||||
sql create database test vgroups 10;
|
||||
sql create database test vgroups 3;
|
||||
sql use test;
|
||||
sql create stable st(ts timestamp, a int, b int , c int, d double) tags(ta int,tb int,tc int);
|
||||
sql create table ts1 using st tags(1,1,1);
|
||||
sql create table ts2 using st tags(2,2,2);
|
||||
sql create table ts3 using st tags(3,2,2);
|
||||
sql create table ts4 using st tags(4,2,2);
|
||||
|
||||
sql create stream streams1 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 watermark 1d into streamt1 as select _wstart, count(*) c1, sum(a) c3 from st interval(10s);
|
||||
sleep 2000
|
||||
|
||||
sql_error create stream stream1_same_dst into streamt1 as select _wstart, count(*) c1, sum(a) c3 from st interval(10s);
|
||||
|
||||
sql pause stream streams1;
|
||||
|
||||
sql insert into ts1 values(1648791213001,1,12,3,1.0);
|
||||
|
@ -102,6 +105,11 @@ if $data11 != 4 then
|
|||
goto loop1
|
||||
endi
|
||||
|
||||
print ===== idle for 70 sec for checkpoint gen
|
||||
sleep 70000
|
||||
|
||||
print ===== idle 60 sec completed , continue
|
||||
|
||||
print ===== step 1 over
|
||||
|
||||
print ===== step2
|
||||
|
@ -113,6 +121,12 @@ sql create table t1(ts timestamp, a int, b int , c int, d double);
|
|||
|
||||
sql create stream streams2 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 watermark 1d into streamt2 as select _wstart, count(*) c1, sum(a) c3 from t1 interval(10s);
|
||||
|
||||
# duplicate stream
|
||||
sql_error create stream streams2 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 watermark 1d into streamt2 as select _wstart, count(*) c1, sum(a) c3 from t1 interval(10s);
|
||||
sql create stream if not exists streams2 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 watermark 1d into streamt2 as select _wstart, count(*) c1, sum(a) c3 from t1 interval(10s);
|
||||
|
||||
sleep 1000
|
||||
|
||||
sql pause stream streams2;
|
||||
|
||||
sql insert into t1 values(1648791213001,1,12,3,1.0);
|
||||
|
@ -237,8 +251,9 @@ print ===== step 2 over
|
|||
print ===== step3
|
||||
sql drop stream if exists streams3;
|
||||
sql drop database if exists test3;
|
||||
sql create database test3 vgroups 10;
|
||||
sql create database test3 vgroups 3;
|
||||
sql use test3;
|
||||
|
||||
sql create stable st(ts timestamp, a int, b int , c int, d double) tags(ta int,tb int,tc int);
|
||||
sql create table ts1 using st tags(1,1,1);
|
||||
sql create table ts2 using st tags(2,2,2);
|
||||
|
|
|
@ -164,7 +164,7 @@ class TDTestCase:
|
|||
offset_value_list = list(map(lambda x: (x[-2].replace("wal:", "").replace("earliest", "0").replace("latest", "0").replace(offset_value, "0")), subscription_info))
|
||||
offset_value_list1 = list(map(lambda x: int(x.split("/")[0]), offset_value_list))
|
||||
offset_value_list2 = list(map(lambda x: int(x.split("/")[1]), offset_value_list))
|
||||
tdSql.checkEqual(offset_value_list1 == offset_value_list2, True)
|
||||
tdSql.checkEqual(offset_value_list1 <= offset_value_list2, True)
|
||||
tdSql.checkEqual(sum(offset_value_list1) >= 0, True)
|
||||
rows_value_list = list(map(lambda x: int(x[-1]), subscription_info))
|
||||
tdSql.checkEqual(sum(rows_value_list), expected_res)
|
||||
|
@ -187,4 +187,4 @@ class TDTestCase:
|
|||
event = threading.Event()
|
||||
|
||||
tdCases.addLinux(__file__, TDTestCase())
|
||||
tdCases.addWindows(__file__, TDTestCase())
|
||||
tdCases.addWindows(__file__, TDTestCase())
|
||||
|
|
|
@ -359,6 +359,44 @@ class TDTestCase:
|
|||
finally:
|
||||
consumer.close()
|
||||
|
||||
def consume_TS_4540_Test(self):
|
||||
tdSql.execute(f'create database if not exists test')
|
||||
tdSql.execute(f'use test')
|
||||
tdSql.execute(f'CREATE STABLE `test`.`b` ( `time` TIMESTAMP , `task_id` NCHAR(1000) ) TAGS( `key` NCHAR(1000))')
|
||||
tdSql.execute(f"insert into `test`.b1 using `test`.`b`(`key`) tags('1') (time, task_id) values ('2024-03-04 12:50:01.000', '32') `test`.b2 using `test`.`b`(`key`) tags('2') (time, task_id) values ('2024-03-04 12:50:01.000', '43') `test`.b3 using `test`.`b`(`key`) tags('3') (time, task_id) values ('2024-03-04 12:50:01.000', '123456')")
|
||||
|
||||
tdSql.execute(f'create topic tt as select tbname,task_id,`key` from b')
|
||||
|
||||
consumer_dict = {
|
||||
"group.id": "g1",
|
||||
"td.connect.user": "root",
|
||||
"td.connect.pass": "taosdata",
|
||||
"auto.offset.reset": "earliest",
|
||||
}
|
||||
consumer = Consumer(consumer_dict)
|
||||
|
||||
try:
|
||||
consumer.subscribe(["tt"])
|
||||
except TmqError:
|
||||
tdLog.exit(f"subscribe error")
|
||||
|
||||
try:
|
||||
while True:
|
||||
res = consumer.poll(1)
|
||||
if not res:
|
||||
break
|
||||
val = res.value()
|
||||
if val is None:
|
||||
continue
|
||||
for block in val:
|
||||
data = block.fetchall()
|
||||
print(data)
|
||||
if data != [('b1', '32', '1')] and data != [('b2', '43', '2')] and data != [('b3', '123456', '3')]:
|
||||
tdLog.exit(f"index = 0 table b1 error")
|
||||
|
||||
finally:
|
||||
consumer.close()
|
||||
|
||||
def consume_ts_4544(self):
|
||||
tdSql.execute(f'create database if not exists d1')
|
||||
tdSql.execute(f'use d1')
|
||||
|
@ -369,6 +407,7 @@ class TDTestCase:
|
|||
tdSql.execute(f'insert into tt1 using stt tags(1) values(now+5s, 11) (now+10s, 12)')
|
||||
|
||||
tdSql.execute(f'create topic topic_in as select * from stt where tbname in ("tt2")')
|
||||
|
||||
consumer_dict = {
|
||||
"group.id": "g1",
|
||||
"td.connect.user": "root",
|
||||
|
@ -384,9 +423,37 @@ class TDTestCase:
|
|||
|
||||
consumer.close()
|
||||
|
||||
def consume_ts_4551(self):
|
||||
tdSql.execute(f'use d1')
|
||||
|
||||
tdSql.execute(f'create topic topic_stable as stable stt where tbname like "t%"')
|
||||
consumer_dict = {
|
||||
"group.id": "g1",
|
||||
"td.connect.user": "root",
|
||||
"td.connect.pass": "taosdata",
|
||||
"auto.offset.reset": "earliest",
|
||||
}
|
||||
consumer = Consumer(consumer_dict)
|
||||
|
||||
try:
|
||||
consumer.subscribe(["topic_stable"])
|
||||
except TmqError:
|
||||
tdLog.exit(f"subscribe error")
|
||||
|
||||
try:
|
||||
while True:
|
||||
res = consumer.poll(1)
|
||||
if not res:
|
||||
break
|
||||
finally:
|
||||
consumer.close()
|
||||
print("consume_ts_4551 ok")
|
||||
|
||||
def run(self):
|
||||
self.consumeTest()
|
||||
self.consume_ts_4544()
|
||||
self.consume_ts_4551()
|
||||
self.consume_TS_4540_Test()
|
||||
|
||||
tdSql.prepare()
|
||||
self.checkWal1VgroupOnlyMeta()
|
||||
|
|
Loading…
Reference in New Issue