other: merge 3.0

This commit is contained in:
Haojun Liao 2023-07-25 18:17:11 +08:00
commit 1c1bf63e1f
34 changed files with 242 additions and 175 deletions

View File

@ -214,19 +214,6 @@ The data of tdinsight dashboard is stored in `log` database (default. You can ch
|dnode\_ep|NCHAR|TAG|dnode endpoint|
|cluster\_id|NCHAR|TAG|cluster id|
### logs table
`logs` table contains login information records.
|field|type|is\_tag|comment|
|:----|:---|:-----|:------|
|ts|TIMESTAMP||timestamp|
|level|VARCHAR||log level|
|content|NCHAR||log content|
|dnode\_id|INT|TAG|dnode id|
|dnode\_ep|NCHAR|TAG|dnode endpoint|
|cluster\_id|NCHAR|TAG|cluster id|
### log\_summary table
`log_summary` table contains log summary information records.

View File

@ -1007,13 +1007,12 @@ consumer.close()
### Other sample programs
| Example program links | Example program content |
| ------------------------------------------------------------------------------------------------------------- | ------------------- ---- |
| [bind_multi.py](https://github.com/taosdata/taos-connector-python/blob/main/examples/bind-multi.py) | parameter binding,
bind multiple rows at once |
| [bind_row.py](https://github.com/taosdata/taos-connector-python/blob/main/examples/bind-row.py) | bind_row.py
|-----------------------|-------------------------|
| [bind_multi.py](https://github.com/taosdata/taos-connector-python/blob/main/examples/bind-multi.py) | parameter binding, bind multiple rows at once |
| [bind_row.py](https://github.com/taosdata/taos-connector-python/blob/main/examples/bind-row.py) | parameter binding, bind one row at once |
| [insert_lines.py](https://github.com/taosdata/taos-connector-python/blob/main/examples/insert-lines.py) | InfluxDB line protocol writing |
| [json_tag.py](https://github.com/taosdata/taos-connector-python/blob/main/examples/json-tag.py) | Use JSON type tags |
| [tmq.py](https://github.com/taosdata/taos-connector-python/blob/main/examples/tmq.py) | TMQ subscription |
| [tmq_consumer.py](https://github.com/taosdata/taos-connector-python/blob/main/examples/tmq_consumer.py) | TMQ subscription |
## Other notes

View File

@ -210,19 +210,6 @@ TDinsight dashboard 数据来源于 log 库存放监控数据的默认db
|dnode\_ep|NCHAR|TAG|dnode endpoint|
|cluster\_id|NCHAR|TAG|cluster id|
### logs 表
`logs` 表记录登录信息。
|field|type|is\_tag|comment|
|:----|:---|:-----|:------|
|ts|TIMESTAMP||timestamp|
|level|VARCHAR||log level|
|content|NCHAR||log content长度不超过1024字节|
|dnode\_id|INT|TAG|dnode id|
|dnode\_ep|NCHAR|TAG|dnode endpoint|
|cluster\_id|NCHAR|TAG|cluster id|
### log\_summary 表
`log_summary` 记录日志统计信息。

View File

@ -85,8 +85,14 @@ extern int64_t tsVndCommitMaxIntervalMs;
extern int64_t tsMndSdbWriteDelta;
extern int64_t tsMndLogRetention;
extern int8_t tsGrant;
extern int32_t tsMndGrantMode;
extern bool tsMndSkipGrant;
// dnode
extern int64_t tsDndStart;
extern int64_t tsDndStartOsUptime;
extern int64_t tsDndUpTime;
// monitor
extern bool tsEnableMonitor;
extern int32_t tsMonitorInterval;

View File

@ -589,8 +589,9 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
int32_t streamLaunchFillHistoryTask(SStreamTask* pTask);
int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask);
int32_t streamStartRecoverTask(SStreamTask* pTask, int8_t igUntreated);
void streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask);
//void streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask);
int32_t streamTaskGetInputQItems(const SStreamTask* pTask);
void streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer);
bool streamTaskRecoverScanStep1Finished(SStreamTask* pTask);
bool streamTaskRecoverScanStep2Finished(SStreamTask* pTask);

View File

@ -53,6 +53,7 @@ extern "C" {
#else
#include <argp.h>
#include <sys/prctl.h>
#include <sys/sysinfo.h>
#if defined(_TD_X86_)
#include <cpuid.h>
#endif

View File

@ -35,6 +35,7 @@ typedef struct {
bool taosCheckSystemIsLittleEnd();
void taosGetSystemInfo();
int64_t taosGetOsUptime();
int32_t taosGetEmail(char *email, int32_t maxLen);
int32_t taosGetOsReleaseName(char *releaseName, char* sName, char* ver, int32_t maxLen);
int32_t taosGetCpuInfo(char *cpuModel, int32_t maxLen, float *numOfCores);

View File

@ -77,8 +77,14 @@ int64_t tsVndCommitMaxIntervalMs = 600 * 1000;
int64_t tsMndSdbWriteDelta = 200;
int64_t tsMndLogRetention = 2000;
int8_t tsGrant = 1;
int32_t tsMndGrantMode = 0;
bool tsMndSkipGrant = false;
// dnode
int64_t tsDndStart = 0;
int64_t tsDndStartOsUptime = 0;
int64_t tsDndUpTime = 0;
// monitor
bool tsEnableMonitor = true;
int32_t tsMonitorInterval = 30;
@ -506,6 +512,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddInt64(pCfg, "mndSdbWriteDelta", tsMndSdbWriteDelta, 20, 10000, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddInt64(pCfg, "mndLogRetention", tsMndLogRetention, 500, 10000, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddInt32(pCfg, "grantMode", tsMndGrantMode, 0, 10000, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddBool(pCfg, "skipGrant", tsMndSkipGrant, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddBool(pCfg, "monitor", tsEnableMonitor, CFG_SCOPE_SERVER) != 0) return -1;
@ -915,6 +922,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
tsMndSdbWriteDelta = cfgGetItem(pCfg, "mndSdbWriteDelta")->i64;
tsMndLogRetention = cfgGetItem(pCfg, "mndLogRetention")->i64;
tsMndSkipGrant = cfgGetItem(pCfg, "skipGrant")->bval;
tsMndGrantMode = cfgGetItem(pCfg, "grantMode")->i32;
tsStartUdfd = cfgGetItem(pCfg, "udf")->bval;
tstrncpy(tsUdfdResFuncs, cfgGetItem(pCfg, "udfdResFuncs")->str, sizeof(tsUdfdResFuncs));

View File

@ -373,6 +373,8 @@ int mainWindows(int argc, char **argv) {
dInfo("start to init service");
dmSetSignalHandle();
tsDndStart = taosGetTimestampMs();
tsDndStartOsUptime = taosGetOsUptime();
int32_t code = dmRun();
dInfo("shutting down the service");

View File

@ -24,12 +24,16 @@ static void *dmStatusThreadFp(void *param) {
const static int16_t TRIM_FREQ = 30;
int32_t trimCount = 0;
int32_t upTimeCount = 0;
int64_t upTime = 0;
while (1) {
taosMsleep(200);
if (pMgmt->pData->dropped || pMgmt->pData->stopped) break;
int64_t curTime = taosGetTimestampMs();
float interval = (curTime - lastTime) / 1000.0f;
if (curTime < lastTime) lastTime = curTime;
float interval = (curTime - lastTime) / 1000.0f;
if (interval >= tsStatusInterval) {
dmSendStatusReq(pMgmt);
lastTime = curTime;
@ -38,6 +42,11 @@ static void *dmStatusThreadFp(void *param) {
if (trimCount == 0) {
taosMemoryTrim(0);
}
if ((upTimeCount = ((upTimeCount + 1) & 63)) == 0) {
upTime = taosGetOsUptime() - tsDndStartOsUptime;
tsDndUpTime = TMAX(tsDndUpTime, upTime);
}
}
}
@ -54,7 +63,8 @@ static void *dmMonitorThreadFp(void *param) {
if (pMgmt->pData->dropped || pMgmt->pData->stopped) break;
int64_t curTime = taosGetTimestampMs();
float interval = (curTime - lastTime) / 1000.0f;
if (curTime < lastTime) lastTime = curTime;
float interval = (curTime - lastTime) / 1000.0f;
if (interval >= tsMonitorInterval) {
(*pMgmt->sendMonitorReportFp)();
lastTime = curTime;

View File

@ -290,6 +290,7 @@ int32_t dmInitClient(SDnode *pDnode) {
rpcInit.cfp = (RpcCfp)dmProcessRpcMsg;
rpcInit.sessions = 1024;
rpcInit.connType = TAOS_CONN_CLIENT;
rpcInit.user = TSDB_DEFAULT_USER;
rpcInit.idleTime = tsShellActivityTimer * 1000;
rpcInit.parent = pDnode;
rpcInit.rfp = rpcRfp;

View File

@ -27,7 +27,7 @@ void mndCleanupCluster(SMnode *pMnode);
int32_t mndGetClusterName(SMnode *pMnode, char *clusterName, int32_t len);
int64_t mndGetClusterId(SMnode *pMnode);
int64_t mndGetClusterCreateTime(SMnode *pMnode);
float mndGetClusterUpTime(SMnode *pMnode);
int64_t mndGetClusterUpTime(SMnode *pMnode);
#ifdef __cplusplus
}

View File

@ -123,7 +123,7 @@ static int32_t mndGetClusterUpTimeImp(SClusterObj *pCluster) {
#endif
}
float mndGetClusterUpTime(SMnode *pMnode) {
int64_t mndGetClusterUpTime(SMnode *pMnode) {
int64_t upTime = 0;
void *pIter = NULL;
SClusterObj *pCluster = mndAcquireCluster(pMnode, &pIter);
@ -132,7 +132,7 @@ float mndGetClusterUpTime(SMnode *pMnode) {
mndReleaseCluster(pMnode, pCluster, pIter);
}
return upTime / 86400.0f;
return upTime;
}
static SSdbRaw *mndClusterActionEncode(SClusterObj *pCluster) {

View File

@ -655,6 +655,7 @@ static int32_t mndConfigDnode(SMnode *pMnode, SRpcMsg *pReq, SMCfgDnodeReq *pCfg
STrans *pTrans = NULL;
SDnodeObj *pDnode = NULL;
bool cfgAll = pCfgReq->dnodeId == -1;
int32_t iter = 0;
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
@ -662,7 +663,8 @@ static int32_t mndConfigDnode(SMnode *pMnode, SRpcMsg *pReq, SMCfgDnodeReq *pCfg
if (cfgAll) {
pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode);
if (pIter == NULL) break;
} else if(!(pDnode = mndAcquireDnode(pMnode, pCfgReq->dnodeId))) {
++iter;
} else if (!(pDnode = mndAcquireDnode(pMnode, pCfgReq->dnodeId))) {
goto _OVER;
}
@ -699,7 +701,7 @@ static int32_t mndConfigDnode(SMnode *pMnode, SRpcMsg *pReq, SMCfgDnodeReq *pCfg
}
if (pTrans && mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
tsGrantHBInterval = TMIN(TMAX(5, iter / 2), 30);
terrno = 0;
_OVER:
@ -863,7 +865,7 @@ static int32_t mndProcessCreateDnodeReq(SRpcMsg *pReq) {
code = mndCreateDnode(pMnode, pReq, &createReq);
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
tsGrantHBInterval = 5;
_OVER:
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
mError("dnode:%s:%d, failed to create since %s", createReq.fqdn, createReq.port, terrstr());

View File

@ -800,7 +800,7 @@ int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgr
if (pObj->id == pMnode->selfDnodeId) {
pClusterInfo->first_ep_dnode_id = pObj->id;
tstrncpy(pClusterInfo->first_ep, pObj->pDnode->ep, sizeof(pClusterInfo->first_ep));
pClusterInfo->master_uptime = mndGetClusterUpTime(pMnode);
pClusterInfo->master_uptime = (float)mndGetClusterUpTime(pMnode) / 86400.0f;
// pClusterInfo->master_uptime = (ms - pObj->stateStartTime) / (86400000.0f);
tstrncpy(desc.role, syncStr(TAOS_SYNC_STATE_LEADER), sizeof(desc.role));
} else {

View File

@ -877,6 +877,7 @@ typedef struct SCacheRowsReader {
SSHashObj *pTableMap;
SArray *pLDataIterArray;
struct SDataFileReader *pFileReader;
STFileSet *pCurFileSet;
STsdbReadSnap *pReadSnap;
char *idstr;
int64_t lastTs;

View File

@ -1107,16 +1107,14 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
int64_t st = taosGetTimestampMs();
// we have to continue retrying to successfully execute the scan history task.
while (1) {
int8_t schedStatus = atomic_val_compare_exchange_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE,
TASK_SCHED_STATUS__WAITING);
if (schedStatus == TASK_SCHED_STATUS__INACTIVE) {
break;
}
tqError("s-task:%s failed to start scan history in current time window, unexpected sched-status:%d, retry in 100ms",
id, schedStatus);
taosMsleep(100);
int8_t schedStatus = atomic_val_compare_exchange_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE,
TASK_SCHED_STATUS__WAITING);
if (schedStatus != TASK_SCHED_STATUS__INACTIVE) {
tqError(
"s-task:%s failed to start scan-history in first stream time window since already started, unexpected "
"sched-status:%d",
id, schedStatus);
return 0;
}
ASSERT(pTask->status.pauseAllowed == false);
@ -1176,16 +1174,15 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
taosMsleep(100);
}
streamTaskHalt(pTask);
// now we can stop the stream task execution
// todo upgrade the statu to be HALT from PAUSE or NORMAL
pStreamTask->status.taskStatus = TASK_STATUS__HALT;
tqDebug("s-task:%s level:%d status is set to halt by fill-history task:%s", pStreamTask->id.idStr,
pStreamTask->info.taskLevel, id);
streamTaskHalt(pStreamTask);
tqDebug("s-task:%s level:%d is halt by fill-history task:%s", pStreamTask->id.idStr, pStreamTask->info.taskLevel,
id);
// if it's an source task, extract the last version in wal.
streamHistoryTaskSetVerRangeStep2(pTask);
pRange = &pTask->dataRange.range;
int64_t latestVer = walReaderGetCurrentVer(pStreamTask->exec.pWalReader);
streamHistoryTaskSetVerRangeStep2(pTask, latestVer);
}
if (!streamTaskRecoverScanStep1Finished(pTask)) {

View File

@ -1949,35 +1949,39 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie
STFileObj **pFileObj = state->pFileSet->farr;
if (pFileObj[0] != NULL || pFileObj[3] != NULL) {
SDataFileReaderConfig conf = {.tsdb = state->pTsdb, .szPage = state->pTsdb->pVnode->config.szPage};
const char *filesName[4] = {0};
if (pFileObj[0] != NULL) {
conf.files[0].file = *pFileObj[0]->f;
conf.files[0].exist = true;
filesName[0] = pFileObj[0]->fname;
if (state->pFileSet != state->pr->pCurFileSet) {
SDataFileReaderConfig conf = {.tsdb = state->pTsdb, .szPage = state->pTsdb->pVnode->config.tsdbPageSize};
const char *filesName[4] = {0};
if (pFileObj[0] != NULL) {
conf.files[0].file = *pFileObj[0]->f;
conf.files[0].exist = true;
filesName[0] = pFileObj[0]->fname;
conf.files[1].file = *pFileObj[1]->f;
conf.files[1].exist = true;
filesName[1] = pFileObj[1]->fname;
conf.files[1].file = *pFileObj[1]->f;
conf.files[1].exist = true;
filesName[1] = pFileObj[1]->fname;
conf.files[2].file = *pFileObj[2]->f;
conf.files[2].exist = true;
filesName[2] = pFileObj[2]->fname;
conf.files[2].file = *pFileObj[2]->f;
conf.files[2].exist = true;
filesName[2] = pFileObj[2]->fname;
}
if (pFileObj[3] != NULL) {
conf.files[3].exist = true;
conf.files[3].file = *pFileObj[3]->f;
filesName[3] = pFileObj[3]->fname;
}
code = tsdbDataFileReaderOpen(filesName, &conf, &state->pr->pFileReader);
if (code != TSDB_CODE_SUCCESS) {
goto _err;
}
loadDataTomb(state->pr, state->pr->pFileReader);
state->pr->pCurFileSet = state->pFileSet;
}
if (pFileObj[3] != NULL) {
conf.files[3].exist = true;
conf.files[3].file = *pFileObj[3]->f;
filesName[3] = pFileObj[3]->fname;
}
code = tsdbDataFileReaderOpen(filesName, &conf, &state->pr->pFileReader);
if (code != TSDB_CODE_SUCCESS) {
goto _err;
}
loadDataTomb(state->pr, state->pr->pFileReader);
if (!state->pIndexList) {
state->pIndexList = taosArrayInit(1, sizeof(SBrinBlk));
} else {
@ -2053,7 +2057,9 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie
}
if (!state->pLastRow) {
lastIterClose(&state->pLastIter);
if (state->pLastIter) {
lastIterClose(&state->pLastIter);
}
clearLastFileSet(state);
state->state = SFSNEXTROW_FILESET;
@ -2154,7 +2160,9 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie
}
if (!state->pLastRow) {
lastIterClose(&state->pLastIter);
if (state->pLastIter) {
lastIterClose(&state->pLastIter);
}
*ppRow = &state->row;
--state->iRow;
@ -2214,19 +2222,6 @@ _err:
return code;
}
int32_t clearNextRowFromFS(void *iter) {
int32_t code = 0;
SFSNextRowIter *state = (SFSNextRowIter *)iter;
if (!state) {
return code;
}
clearLastFileSet(state);
return code;
}
typedef enum SMEMNEXTROWSTATES {
SMEMNEXTROW_ENTER,
SMEMNEXTROW_NEXT,
@ -2346,6 +2341,36 @@ typedef struct CacheNextRowIter {
STsdb *pTsdb;
} CacheNextRowIter;
int32_t clearNextRowFromFS(void *iter) {
int32_t code = 0;
SFSNextRowIter *state = (SFSNextRowIter *)iter;
if (!state) {
return code;
}
if (state->pLastIter) {
lastIterClose(&state->pLastIter);
}
if (state->pBlockData) {
tBlockDataDestroy(state->pBlockData);
state->pBlockData = NULL;
}
if (state->pTSRow) {
taosMemoryFree(state->pTSRow);
state->pTSRow = NULL;
}
if (state->pRowIter->pSkyline) {
taosArrayDestroy(state->pRowIter->pSkyline);
state->pRowIter->pSkyline = NULL;
}
return code;
}
static void clearLastFileSet(SFSNextRowIter *state) {
if (state->pLastIter) {
lastIterClose(&state->pLastIter);
@ -2359,6 +2384,8 @@ static void clearLastFileSet(SFSNextRowIter *state) {
if (state->pr->pFileReader) {
tsdbDataFileReaderClose(&state->pr->pFileReader);
state->pr->pFileReader = NULL;
state->pr->pCurFileSet = NULL;
}
if (state->pTSRow) {

View File

@ -189,6 +189,10 @@ static int32_t tsdbCommitTombData(SCommitter2 *committer) {
committer->ctx->maxKey = committer->ctx->maxKey + 1;
}
if (record->ekey > committer->ctx->maxKey) {
committer->ctx->nextKey = committer->ctx->maxKey;
}
record->skey = TMAX(record->skey, committer->ctx->minKey);
record->ekey = TMIN(record->ekey, committer->ctx->maxKey);
@ -602,4 +606,4 @@ _exit:
tsdbInfo("vgId:%d %s done", TD_VID(pTsdb->pVnode), __func__);
}
return code;
}
}

View File

@ -112,7 +112,10 @@ static char* getFileNamePrefix(STsdb *pTsdb, SDiskID did, int32_t fid, uint64_t
p += titoa(TD_VID(pTsdb->pVnode), 10, p);
*(p++) = 'f';
p += titoa(fid, 10, p);
if (fid < 0) {
*(p++) = '-';
}
p += titoa((fid < 0) ? -fid : fid, 10, p);
memcpy(p, "ver", 3);
p += 3;

View File

@ -352,9 +352,9 @@ static int32_t extractSttBlockInfo(SLDataIter *pIter, const TSttBlkArray *pArray
return TSDB_CODE_SUCCESS;
}
static int32_t uidComparFn(const void* p1, const void* p2) {
const uint64_t* uid1 = p1;
const uint64_t* uid2 = p2;
static int32_t uidComparFn(const void *p1, const void *p2) {
const uint64_t *uid1 = p1;
const uint64_t *uid2 = p2;
return (*uid1) - (*uid2);
}
@ -372,16 +372,16 @@ static bool existsFromSttBlkStatis(const TStatisBlkArray *pStatisBlkArray, uint6
}
}
// for (; i < TARRAY2_SIZE(pStatisBlkArray); ++i) {
// SStatisBlk *p = &pStatisBlkArray->data[i];
// if (p->minTbid.uid <= uid && p->maxTbid.uid >= uid) {
// break;
// }
//
// if (p->maxTbid.uid < uid) {
// break;
// }
// }
// for (; i < TARRAY2_SIZE(pStatisBlkArray); ++i) {
// SStatisBlk *p = &pStatisBlkArray->data[i];
// if (p->minTbid.uid <= uid && p->maxTbid.uid >= uid) {
// break;
// }
//
// if (p->maxTbid.uid < uid) {
// break;
// }
// }
if (i >= TARRAY2_SIZE(pStatisBlkArray)) {
return false;
@ -416,7 +416,7 @@ int32_t tLDataIterOpen2(struct SLDataIter *pIter, SSttFileReader *pSttFileReader
if (!pBlockLoadInfo->sttBlockLoaded) {
int64_t st = taosGetTimestampUs();
const TSttBlkArray*pSttBlkArray = NULL;
const TSttBlkArray *pSttBlkArray = NULL;
pBlockLoadInfo->sttBlockLoaded = true;
// load the stt block info for each stt-block
@ -445,12 +445,12 @@ int32_t tLDataIterOpen2(struct SLDataIter *pIter, SSttFileReader *pSttFileReader
tsdbDebug("load the stt file info completed, elapsed time:%.2fms, %s", el, idStr);
}
// bool exists = existsFromSttBlkStatis(pBlockLoadInfo->pSttStatisBlkArray, suid, uid, pIter->pReader);
// if (!exists) {
// pIter->iSttBlk = -1;
// pIter->pSttBlk = NULL;
// return TSDB_CODE_SUCCESS;
// }
// bool exists = existsFromSttBlkStatis(pBlockLoadInfo->pSttStatisBlkArray, suid, uid, pIter->pReader);
// if (!exists) {
// pIter->iSttBlk = -1;
// pIter->pSttBlk = NULL;
// return TSDB_CODE_SUCCESS;
// }
// find the start block, actually we could load the position to avoid repeatly searching for the start position when
// the skey is updated.
@ -794,7 +794,7 @@ int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf) {
// open stt file reader if not
if (pSttFileReader == NULL) {
SSttFileReaderConfig conf = {.tsdb = pConf->pTsdb, .szPage = pConf->pTsdb->pVnode->config.szPage};
SSttFileReaderConfig conf = {.tsdb = pConf->pTsdb, .szPage = pConf->pTsdb->pVnode->config.tsdbPageSize};
conf.file[0] = *pSttLevel->fobjArr->data[i]->f;
code = tsdbSttFileReaderOpen(pSttLevel->fobjArr->data[i]->fname, &conf, &pSttFileReader);

View File

@ -189,7 +189,7 @@ static int32_t filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader, bo
STFileObj** pFileObj = pReader->status.pCurrentFileset->farr;
if (pFileObj[0] != NULL || pFileObj[3] != NULL) {
SDataFileReaderConfig conf = {.tsdb = pReader->pTsdb, .szPage = pReader->pTsdb->pVnode->config.szPage};
SDataFileReaderConfig conf = {.tsdb = pReader->pTsdb, .szPage = pReader->pTsdb->pVnode->config.tsdbPageSize};
const char* filesName[4] = {0};

View File

@ -749,7 +749,7 @@ int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo) {
int64_t size;
TdFilePtr pOutFD = NULL;
TdFilePtr PInFD = NULL;
int32_t szPage = pTsdb->pVnode->config.szPage;
int32_t szPage = pTsdb->pVnode->config.tsdbPageSize;
char fNameFrom[TSDB_FILENAME_LEN];
char fNameTo[TSDB_FILENAME_LEN];

View File

@ -2881,15 +2881,23 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
int32_t tableStartIdx = pInfo->tableStartIndex;
int32_t tableEndIdx = pInfo->tableEndIndex;
pInfo->sortBufSize = 2048 * pInfo->bufPageSize;
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_BLOCK_TS_MERGE, pInfo->bufPageSize, numOfBufPage,
pInfo->pSortInputBlock, pTaskInfo->id.str, 0, 0, 0);
bool hasLimit = pInfo->limitInfo.limit.limit != -1 || pInfo->limitInfo.limit.offset != -1;
int64_t mergeLimit = -1;
if (pInfo->limitInfo.limit.limit != -1 || pInfo->limitInfo.limit.offset != -1) {
mergeLimit = pInfo->limitInfo.limit.limit + pInfo->limitInfo.limit.offset;
}
tsortSetMergeLimit(pInfo->pSortHandle, mergeLimit);
if (hasLimit) {
mergeLimit = pInfo->limitInfo.limit.limit + pInfo->limitInfo.limit.offset;
}
size_t szRow = blockDataGetRowSize(pInfo->pResBlock);
if (hasLimit) {
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_SINGLESOURCE_SORT, -1, -1,
NULL, pTaskInfo->id.str, mergeLimit, szRow+8, tsPQSortMemThreshold * 1024* 1024);
} else {
pInfo->sortBufSize = 2048 * pInfo->bufPageSize;
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_BLOCK_TS_MERGE, pInfo->bufPageSize, numOfBufPage,
pInfo->pSortInputBlock, pTaskInfo->id.str, 0, 0, 0);
tsortSetMergeLimit(pInfo->pSortHandle, mergeLimit);
}
tsortSetFetchRawDataFp(pInfo->pSortHandle, getTableDataBlockImpl, NULL, NULL);
// one table has one data block

View File

@ -54,19 +54,19 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode*
int32_t numOfCols = 0;
pOperator->exprSupp.pExprInfo = createExprInfo(pSortNode->pExprs, NULL, &numOfCols);
pOperator->exprSupp.numOfExprs = numOfCols;
calcSortOperMaxTupleLength(pInfo, pSortNode->pSortKeys);
pInfo->maxRows = -1;
if (pSortNode->node.pLimit) {
SLimitNode* pLimit = (SLimitNode*)pSortNode->node.pLimit;
if (pLimit->limit > 0) pInfo->maxRows = pLimit->limit;
}
int32_t numOfOutputCols = 0;
int32_t code =
extractColMatchInfo(pSortNode->pTargets, pDescNode, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID, &pInfo->matchInfo);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
calcSortOperMaxTupleLength(pInfo, pSortNode->pSortKeys);
pInfo->maxRows = -1;
if (pSortNode->node.pLimit) {
SLimitNode* pLimit = (SLimitNode*)pSortNode->node.pLimit;
if (pLimit->limit > 0) pInfo->maxRows = pLimit->limit + pLimit->offset;
}
pOperator->exprSupp.pCtx =
createSqlFunctionCtx(pOperator->exprSupp.pExprInfo, numOfCols, &pOperator->exprSupp.rowEntryInfoOffset, &pTaskInfo->storageAPI.functionStore);

View File

@ -51,6 +51,7 @@ struct SSortHandle {
uint32_t tmpRowIdx;
int64_t mergeLimit;
int64_t currMergeLimitTs;
int32_t sourceId;
SSDataBlock* pDataBlock;
@ -921,7 +922,8 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SBlockO
int32_t nMergedRows = 0;
bool mergeLimitReached = false;
size_t blkPgSz = pgHeaderSz;
int64_t lastPageBufTs = (order->order == TSDB_ORDER_ASC) ? INT64_MAX : INT64_MIN;
int64_t currTs = (order->order == TSDB_ORDER_ASC) ? INT64_MAX : INT64_MIN;
while (nRows < totalRows) {
int32_t minIdx = tMergeTreeGetChosenIndex(pTree);
SSDataBlock* minBlk = taosArrayGetP(aBlk, minIdx);
@ -929,14 +931,21 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SBlockO
int32_t bufInc = getPageBufIncForRow(minBlk, minRow, pHandle->pDataBlock->info.rows);
if (blkPgSz <= pHandle->pageSize && blkPgSz + bufInc > pHandle->pageSize) {
SColumnInfoData* tsCol = taosArrayGet(pHandle->pDataBlock->pDataBlock, order->slotId);
lastPageBufTs = ((int64_t*)tsCol->pData)[pHandle->pDataBlock->info.rows - 1];
appendDataBlockToPageBuf(pHandle, pHandle->pDataBlock, aPgId);
nMergedRows += pHandle->pDataBlock->info.rows;
blockDataCleanup(pHandle->pDataBlock);
blkPgSz = pgHeaderSz;
bufInc = getPageBufIncForRow(minBlk, minRow, 0);
if ((pHandle->mergeLimit != -1) && (nMergedRows >= pHandle->mergeLimit)) {
mergeLimitReached = true;
if ((lastPageBufTs < pHandle->currMergeLimitTs && order->order == TSDB_ORDER_ASC) ||
(lastPageBufTs > pHandle->currMergeLimitTs && order->order == TSDB_ORDER_DESC)) {
pHandle->currMergeLimitTs = lastPageBufTs;
}
break;
}
}
@ -955,8 +964,17 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SBlockO
}
if (pHandle->pDataBlock->info.rows > 0) {
if (!mergeLimitReached) {
SColumnInfoData* tsCol = taosArrayGet(pHandle->pDataBlock->pDataBlock, order->slotId);
lastPageBufTs = ((int64_t*)tsCol->pData)[pHandle->pDataBlock->info.rows - 1];
appendDataBlockToPageBuf(pHandle, pHandle->pDataBlock, aPgId);
nMergedRows += pHandle->pDataBlock->info.rows;
if ((pHandle->mergeLimit != -1) && (nMergedRows >= pHandle->mergeLimit)) {
mergeLimitReached = true;
if ((lastPageBufTs < pHandle->currMergeLimitTs && order->order == TSDB_ORDER_ASC) ||
(lastPageBufTs > pHandle->currMergeLimitTs && order->order == TSDB_ORDER_DESC)) {
pHandle->currMergeLimitTs = lastPageBufTs;
}
}
}
blockDataCleanup(pHandle->pDataBlock);
}
@ -982,11 +1000,24 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
SSortSource* pSrc = taosArrayGetP(pHandle->pOrderedSource, 0);
int32_t szSort = 0;
if (pOrder->order == TSDB_ORDER_ASC) {
pHandle->currMergeLimitTs = INT64_MAX;
} else {
pHandle->currMergeLimitTs = INT64_MIN;
}
SArray* aBlkSort = taosArrayInit(8, POINTER_BYTES);
SSHashObj* mUidBlk = tSimpleHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT));
while (1) {
SSDataBlock* pBlk = pHandle->fetchfp(pSrc->param);
if (pBlk != NULL) {
SColumnInfoData* tsCol = taosArrayGet(pBlk->pDataBlock, pOrder->slotId);
int64_t firstRowTs = *(int64_t*)tsCol->pData;
if ((pOrder->order == TSDB_ORDER_ASC && firstRowTs > pHandle->currMergeLimitTs) ||
(pOrder->order == TSDB_ORDER_DESC && firstRowTs < pHandle->currMergeLimitTs)) {
continue;
}
}
if (pBlk != NULL) {
szSort += blockDataGetSize(pBlk);
@ -1374,6 +1405,9 @@ static int32_t tsortOpenForPQSort(SSortHandle* pHandle) {
}
static STupleHandle* tsortPQSortNextTuple(SSortHandle* pHandle) {
if (pHandle->pDataBlock == NULL) { // when no input stream datablock
return NULL;
}
blockDataCleanup(pHandle->pDataBlock);
blockDataEnsureCapacity(pHandle->pDataBlock, 1);
// abandon the top tuple if queue size bigger than max size

View File

@ -547,7 +547,7 @@ void monSendReport() {
monGenGrantJson(pMonitor);
monGenDnodeJson(pMonitor);
monGenDiskJson(pMonitor);
monGenLogJson(pMonitor);
//monGenLogJson(pMonitor); // TS-3691
char *pCont = tjsonToString(pMonitor->pJson);
// uDebugL("report cont:%s\n", pCont);

View File

@ -2934,14 +2934,14 @@ static int32_t createMultiResFuncsFromStar(STranslateContext* pCxt, SFunctionNod
static int32_t createTags(STranslateContext* pCxt, SNodeList** pOutput) {
if (QUERY_NODE_REAL_TABLE != nodeType(((SSelectStmt*)pCxt->pCurrStmt)->pFromTable)) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_TAGS_PC,
"The _TAGS pseudo column can only be used for subtable and supertable queries");
"The _TAGS pseudo column can only be used for child table and super table queries");
}
SRealTableNode* pTable = (SRealTableNode*)(((SSelectStmt*)pCxt->pCurrStmt)->pFromTable);
const STableMeta* pMeta = pTable->pMeta;
if (TSDB_SUPER_TABLE != pMeta->tableType && TSDB_CHILD_TABLE != pMeta->tableType) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_TAGS_PC,
"The _TAGS pseudo column can only be used for subtable and supertable queries");
"The _TAGS pseudo column can only be used for child table and super table queries");
}
SSchema* pTagsSchema = getTableTagSchema(pMeta);

View File

@ -164,6 +164,8 @@ static char* getSyntaxErrFormat(int32_t errCode) {
return "%s function is not supported in fill query";
case TSDB_CODE_PAR_INVALID_WINDOW_PC:
return "_WSTART, _WEND and _WDURATION can only be used in window query";
case TSDB_CODE_PAR_INVALID_TAGS_PC:
return "Tags can only applied to super table and child table";
case TSDB_CODE_PAR_WINDOW_NOT_ALLOWED_FUNC:
return "%s function is not supported in time window query";
case TSDB_CODE_PAR_STREAM_NOT_ALLOWED_FUNC:

View File

@ -687,6 +687,9 @@ int32_t streamTryExec(SStreamTask* pTask) {
streamSchedExec(pTask);
}
}
} else {
qDebug("s-task:%s already started to exec by other thread, status:%s, sched-status:%d", pTask->id.idStr,
streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus);
}
return 0;

View File

@ -624,9 +624,8 @@ int32_t streamTaskRecoverSetAllStepFinished(SStreamTask* pTask) {
return qStreamRecoverSetAllStepFinished(exec);
}
void streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask) {
void streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer) {
SVersionRange* pRange = &pTask->dataRange.range;
int64_t latestVer = walReaderGetCurrentVer(pTask->exec.pWalReader);
ASSERT(latestVer >= pRange->maxVer);
int64_t nextStartVer = pRange->maxVer + 1;

View File

@ -961,6 +961,18 @@ char *taosGetCmdlineByPID(int pid) {
#endif
}
int64_t taosGetOsUptime() {
#ifdef WINDOWS
#elif defined(_TD_DARWIN_64)
#else
struct sysinfo info;
if (0 == sysinfo(&info)) {
return (int64_t)info.uptime * 1000;
}
#endif
return 0;
}
void taosSetCoreDump(bool enable) {
if (!enable) return;
#ifdef WINDOWS

View File

@ -351,10 +351,10 @@ int32_t titoa(uint64_t val, size_t radix, char str[]) {
int32_t i = 0;
uint64_t v = val;
while(v > 0) {
do {
buf[i++] = s[v % radix];
v /= radix;
}
} while (v > 0);
// reverse order
for(int32_t j = 0; j < i; ++j) {

View File

@ -186,33 +186,6 @@ class RequestHandlerImpl(http.server.BaseHTTPRequestHandler):
tdLog.exit("total is null!")
# log_infos ====================================
if "log_infos" not in infoDict or infoDict["log_infos"]== None:
tdLog.exit("log_infos is null!")
if "logs" not in infoDict["log_infos"] or len(infoDict["log_infos"]["logs"]) < 8:#!= 10:
tdLog.exit("logs is null!")
if "ts" not in infoDict["log_infos"]["logs"][0] or len(infoDict["log_infos"]["logs"][0]["ts"]) <= 10:
tdLog.exit("ts is null!")
if "level" not in infoDict["log_infos"]["logs"][0] or infoDict["log_infos"]["logs"][0]["level"] not in ["error" ,"info" , "debug" ,"trace"]:
tdLog.exit("level is null!")
if "content" not in infoDict["log_infos"]["logs"][0] or len(infoDict["log_infos"]["logs"][0]["ts"]) <= 1:
tdLog.exit("content is null!")
if "summary" not in infoDict["log_infos"] or len(infoDict["log_infos"]["summary"])!= 4:
tdLog.exit("summary is null!")
if "total" not in infoDict["log_infos"]["summary"][0] or infoDict["log_infos"]["summary"][0]["total"] < 0 :
tdLog.exit("total is null!")
if "level" not in infoDict["log_infos"]["summary"][0] or infoDict["log_infos"]["summary"][0]["level"] not in ["error" ,"info" , "debug" ,"trace"]:
tdLog.exit("level is null!")
def do_GET(self):
"""
process GET request
@ -315,4 +288,3 @@ class TDTestCase:
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())