Merge remote-tracking branch 'origin/3.0' into feat/TD-27337

This commit is contained in:
dapan1121 2024-03-22 08:47:50 +08:00
commit 861a74cc93
11 changed files with 86 additions and 55 deletions

View File

@ -61,9 +61,6 @@ typedef enum {
int32_t grantCheck(EGrantType grant);
int32_t grantCheckExpire(EGrantType grant);
char* tGetMachineId();
#ifdef TD_UNIQ_GRANT
int32_t grantCheckLE(EGrantType grant);
#endif
// #ifndef GRANTS_CFG
#ifdef TD_ENTERPRISE
@ -76,7 +73,7 @@ int32_t grantCheckLE(EGrantType grant);
{.name = "state", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, \
{.name = "timeseries", .bytes = 21 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, \
{.name = "dnodes", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, \
{.name = "cpu_cores", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, \
{.name = "cpu_cores", .bytes = 13 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, \
}
#else
#define GRANTS_SCHEMA \
@ -88,7 +85,7 @@ int32_t grantCheckLE(EGrantType grant);
{.name = "state", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, \
{.name = "timeseries", .bytes = 21 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, \
{.name = "dnodes", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, \
{.name = "cpu_cores", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, \
{.name = "cpu_cores", .bytes = 13 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, \
}
#endif
// #define GRANT_CFG_ADD

View File

@ -359,21 +359,24 @@ static const SSysDbTableSchema userCompactsDetailSchema[] = {
{.name = "start_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false},
};
static const SSysDbTableSchema useGrantsFullSchema[] = {
static const SSysDbTableSchema userGrantsFullSchema[] = {
{.name = "grant_name", .bytes = 32 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
{.name = "display_name", .bytes = 256 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
{.name = "expire", .bytes = 32 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
{.name = "limits", .bytes = 512 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
};
static const SSysDbTableSchema useGrantsLogsSchema[] = {
static const SSysDbTableSchema userGrantsLogsSchema[] = {
{.name = "state", .bytes = 1536 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
{.name = "active", .bytes = 512 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
{.name = "machine", .bytes = TSDB_GRANT_LOG_COL_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
};
static const SSysDbTableSchema useMachinesSchema[] = {
static const SSysDbTableSchema userMachinesSchema[] = {
{.name = "id", .bytes = TSDB_CLUSTER_ID_LEN + 1 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
#ifndef TD_UNIQ_GRANT
{.name = "dnode_num", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = true},
#endif
{.name = "machine", .bytes = 7552 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
};
@ -406,9 +409,9 @@ static const SSysTableMeta infosMeta[] = {
{TSDB_INS_TABLE_VIEWS, userViewsSchema, tListLen(userViewsSchema), false},
{TSDB_INS_TABLE_COMPACTS, userCompactsSchema, tListLen(userCompactsSchema), false},
{TSDB_INS_TABLE_COMPACT_DETAILS, userCompactsDetailSchema, tListLen(userCompactsDetailSchema), false},
{TSDB_INS_TABLE_GRANTS_FULL, useGrantsFullSchema, tListLen(useGrantsFullSchema), true},
{TSDB_INS_TABLE_GRANTS_LOGS, useGrantsLogsSchema, tListLen(useGrantsLogsSchema), true},
{TSDB_INS_TABLE_MACHINES, useMachinesSchema, tListLen(useMachinesSchema), true},
{TSDB_INS_TABLE_GRANTS_FULL, userGrantsFullSchema, tListLen(userGrantsFullSchema), true},
{TSDB_INS_TABLE_GRANTS_LOGS, userGrantsLogsSchema, tListLen(userGrantsLogsSchema), true},
{TSDB_INS_TABLE_MACHINES, userMachinesSchema, tListLen(userMachinesSchema), true},
{TSDB_INS_TABLE_ARBGROUPS, arbGroupsSchema, tListLen(arbGroupsSchema), true},
};

View File

@ -21,11 +21,5 @@
int32_t grantCheck(EGrantType grant) { return TSDB_CODE_SUCCESS; }
int32_t grantCheckExpire(EGrantType grant) { return TSDB_CODE_SUCCESS; }
#ifdef TD_UNIQ_GRANT
int32_t grantCheckLE(EGrantType grant) { return TSDB_CODE_SUCCESS; }
#endif
#else
#ifdef TD_UNIQ_GRANT
int32_t grantCheckExpire(EGrantType grant) { return TSDB_CODE_SUCCESS; }
#endif
#endif

View File

@ -217,8 +217,6 @@ typedef struct {
uint16_t port;
char fqdn[TSDB_FQDN_LEN];
char ep[TSDB_EP_LEN];
char active[TSDB_ACTIVE_KEY_LEN];
char connActive[TSDB_CONN_ACTIVE_KEY_LEN];
char machineId[TSDB_MACHINE_ID_LEN + 1];
} SDnodeObj;

View File

@ -107,7 +107,7 @@ SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady);
void mndKillTransImpl(SMnode *pMnode, int32_t transId, const char *pDbName);
int32_t setTransAction(STrans *pTrans, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset,
int32_t retryCode);
STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, const char *name, const char *pMsg);
STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, ETrnConflct conflict, const char *name, const char *pMsg);
int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans, int32_t status);
SSdbRaw *mndStreamActionEncode(SStreamObj *pStream);
void killAllCheckpointTrans(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo);

View File

@ -33,7 +33,7 @@
#include "taos_monitor.h"
#define TSDB_DNODE_VER_NUMBER 2
#define TSDB_DNODE_RESERVE_SIZE 64
#define TSDB_DNODE_RESERVE_SIZE 40
static const char *offlineReason[] = {
"",
@ -183,11 +183,10 @@ static SSdbRaw *mndDnodeActionEncode(SDnodeObj *pDnode) {
SDB_SET_INT64(pRaw, dataPos, pDnode->updateTime, _OVER)
SDB_SET_INT16(pRaw, dataPos, pDnode->port, _OVER)
SDB_SET_BINARY(pRaw, dataPos, pDnode->fqdn, TSDB_FQDN_LEN, _OVER)
SDB_SET_BINARY(pRaw, dataPos, pDnode->machineId, TSDB_MACHINE_ID_LEN, _OVER)
SDB_SET_RESERVE(pRaw, dataPos, TSDB_DNODE_RESERVE_SIZE, _OVER)
SDB_SET_INT16(pRaw, dataPos, TSDB_ACTIVE_KEY_LEN, _OVER)
SDB_SET_BINARY(pRaw, dataPos, pDnode->active, TSDB_ACTIVE_KEY_LEN, _OVER)
SDB_SET_INT16(pRaw, dataPos, TSDB_CONN_ACTIVE_KEY_LEN, _OVER)
SDB_SET_BINARY(pRaw, dataPos, pDnode->connActive, TSDB_CONN_ACTIVE_KEY_LEN, _OVER)
SDB_SET_INT16(pRaw, dataPos, 0, _OVER) // forward/backward compatible
SDB_SET_INT16(pRaw, dataPos, 0, _OVER) // forward/backward compatible
SDB_SET_DATALEN(pRaw, dataPos, _OVER);
terrno = 0;
@ -227,13 +226,14 @@ static SSdbRow *mndDnodeActionDecode(SSdbRaw *pRaw) {
SDB_GET_INT64(pRaw, dataPos, &pDnode->updateTime, _OVER)
SDB_GET_INT16(pRaw, dataPos, &pDnode->port, _OVER)
SDB_GET_BINARY(pRaw, dataPos, pDnode->fqdn, TSDB_FQDN_LEN, _OVER)
SDB_GET_BINARY(pRaw, dataPos, pDnode->machineId, TSDB_MACHINE_ID_LEN, _OVER)
SDB_GET_RESERVE(pRaw, dataPos, TSDB_DNODE_RESERVE_SIZE, _OVER)
if (sver > 1) {
int16_t keyLen = 0;
SDB_GET_INT16(pRaw, dataPos, &keyLen, _OVER)
SDB_GET_BINARY(pRaw, dataPos, pDnode->active, keyLen, _OVER)
SDB_GET_BINARY(pRaw, dataPos, NULL, keyLen, _OVER)
SDB_GET_INT16(pRaw, dataPos, &keyLen, _OVER)
SDB_GET_BINARY(pRaw, dataPos, pDnode->connActive, keyLen, _OVER)
SDB_GET_BINARY(pRaw, dataPos, NULL, keyLen, _OVER)
}
terrno = 0;
@ -271,12 +271,7 @@ static int32_t mndDnodeActionUpdate(SSdb *pSdb, SDnodeObj *pOld, SDnodeObj *pNew
mTrace("dnode:%d, perform update action, old row:%p new row:%p", pOld->id, pOld, pNew);
pOld->updateTime = pNew->updateTime;
#ifdef TD_ENTERPRISE
if (strncmp(pOld->active, pNew->active, TSDB_ACTIVE_KEY_LEN) != 0) {
strncpy(pOld->active, pNew->active, TSDB_ACTIVE_KEY_LEN);
}
if (strncmp(pOld->connActive, pNew->connActive, TSDB_CONN_ACTIVE_KEY_LEN) != 0) {
strncpy(pOld->connActive, pNew->connActive, TSDB_CONN_ACTIVE_KEY_LEN);
}
tstrncpy(pOld->machineId, pNew->machineId, TSDB_MACHINE_ID_LEN + 1);
#endif
return 0;
}
@ -716,6 +711,35 @@ _OVER:
*/
}
static int32_t mndUpdateDnodeObj(SMnode *pMnode, SDnodeObj *pDnode) {
int32_t code = 0;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, NULL, "update-dnode-obj");
if (pTrans == NULL) {
code = terrno;
goto _exit;
}
pDnode->updateTime = taosGetTimestampMs();
SSdbRaw *pCommitRaw = mndDnodeActionEncode(pDnode);
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
code = terrno;
goto _exit;
}
(void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
code = terrno;
goto _exit;
}
_exit:
mndTransDrop(pTrans);
return code;
}
static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SStatusReq statusReq = {0};
@ -885,8 +909,9 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
pDnode->numOfDiskCfg = statusReq.numOfDiskCfg;
pDnode->memAvail = statusReq.memAvail;
pDnode->memTotal = statusReq.memTotal;
if (pDnode->machineId[0] == 0 && statusReq.machineId[0] != 0) {
if (memcmp(pDnode->machineId, statusReq.machineId, TSDB_MACHINE_ID_LEN) != 0) {
tstrncpy(pDnode->machineId, statusReq.machineId, TSDB_MACHINE_ID_LEN + 1);
mndUpdateDnodeObj(pMnode, pDnode);
}
SStatusRsp statusRsp = {0};

View File

@ -513,6 +513,7 @@ static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStre
createReq.numOfColumns = pStream->outputSchema.nCols;
createReq.numOfTags = 1; // group id
createReq.pColumns = taosArrayInit_s(sizeof(SField), createReq.numOfColumns);
// build fields
for (int32_t i = 0; i < createReq.numOfColumns; i++) {
SField *pField = taosArrayGet(createReq.pColumns, i);
@ -586,12 +587,15 @@ static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStre
mndFreeStb(&stbObj);
mndReleaseStb(pMnode, pStb);
mndReleaseDb(pMnode, pDb);
mDebug("stream:%s create dst stable:%s, cols:%d", pStream->name, pStream->targetSTbName, pStream->outputSchema.nCols);
return 0;
_OVER:
tFreeSMCreateStbReq(&createReq);
mndReleaseStb(pMnode, pStb);
mndReleaseDb(pMnode, pDb);
mDebug("stream:%s failed to create dst stable:%s, code:%s", pStream->name, pStream->targetSTbName, tstrerror(terrno));
return -1;
}
@ -647,7 +651,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
terrno = TSDB_CODE_MND_INVALID_PLATFORM;
goto _OVER;
#endif
mInfo("stream:%s, start to create, sql:%s", createReq.name, createReq.sql);
mInfo("stream:%s, start to create stream, sql:%s", createReq.name, createReq.sql);
if (mndCheckCreateStreamReq(&createReq) != 0) {
mError("stream:%s, failed to create since %s", createReq.name, terrstr());
@ -684,18 +688,21 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
goto _OVER;
}
STrans *pTrans = doCreateTrans(pMnode, &streamObj, pReq, MND_STREAM_CREATE_NAME, "create stream tasks on dnodes");
STrans *pTrans = doCreateTrans(pMnode, &streamObj, pReq, TRN_CONFLICT_DB, MND_STREAM_CREATE_NAME, "create stream tasks on dnodes");
if (pTrans == NULL) {
goto _OVER;
}
// create stb for stream
if (createReq.createStb == STREAM_CREATE_STABLE_TRUE &&
mndCreateStbForStream(pMnode, pTrans, &streamObj, pReq->info.conn.user) < 0) {
if (createReq.createStb == STREAM_CREATE_STABLE_TRUE) {
if (mndCreateStbForStream(pMnode, pTrans, &streamObj, pReq->info.conn.user) < 0) {
mError("trans:%d, failed to create stb for stream %s since %s", pTrans->id, createReq.name, terrstr());
mndTransDrop(pTrans);
goto _OVER;
}
} else {
mDebug("stream:%s no need create stable", createReq.name);
}
// schedule stream task for stream obj
if (mndScheduleStream(pMnode, &streamObj, createReq.lastTs, createReq.pVgroupVerList) < 0) {
@ -724,7 +731,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
// add into buffer firstly
// to make sure when the hb from vnode arrived, the newly created tasks have been in the task map already.
taosThreadMutexLock(&execInfo.lock);
mDebug("stream stream:%s tasks register into node list", createReq.name);
mDebug("stream stream:%s start to register tasks into task_node_list", createReq.name);
saveStreamTasksInfo(&streamObj, &execInfo);
taosThreadMutexUnlock(&execInfo.lock);
@ -890,7 +897,7 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre
return -1;
}
STrans *pTrans = doCreateTrans(pMnode, pStream, NULL, MND_STREAM_CHECKPOINT_NAME, "gen checkpoint for stream");
STrans *pTrans = doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_CHECKPOINT_NAME, "gen checkpoint for stream");
if (pTrans == NULL) {
mError("failed to checkpoint of stream name%s, checkpointId: %" PRId64 ", reason:%s", pStream->name, checkpointId,
tstrerror(TSDB_CODE_MND_TRANS_CONFLICT));
@ -1143,7 +1150,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
return -1;
}
STrans *pTrans = doCreateTrans(pMnode, pStream, pReq, MND_STREAM_DROP_NAME, "drop stream");
STrans *pTrans = doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_DROP_NAME, "drop stream");
if (pTrans == NULL) {
mError("stream:%s, failed to drop since %s", dropReq.name, terrstr());
sdbRelease(pMnode->pSdb, pStream);
@ -1573,7 +1580,7 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
return -1;
}
STrans *pTrans = doCreateTrans(pMnode, pStream, pReq, MND_STREAM_PAUSE_NAME, "pause the stream");
STrans *pTrans = doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_PAUSE_NAME, "pause the stream");
if (pTrans == NULL) {
mError("stream:%s failed to pause stream since %s", pauseReq.name, terrstr());
sdbRelease(pMnode->pSdb, pStream);
@ -1662,7 +1669,7 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
return -1;
}
STrans *pTrans = doCreateTrans(pMnode, pStream, pReq, MND_STREAM_RESUME_NAME, "resume the stream");
STrans *pTrans = doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_RESUME_NAME, "resume the stream");
if (pTrans == NULL) {
mError("stream:%s, failed to resume stream since %s", resumeReq.name, terrstr());
sdbRelease(pMnode->pSdb, pStream);
@ -1794,7 +1801,7 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange
// here create only one trans
if (pTrans == NULL) {
pTrans = doCreateTrans(pMnode, pStream, NULL, MND_STREAM_TASK_UPDATE_NAME, "update task epsets");
pTrans = doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_TASK_UPDATE_NAME, "update task epsets");
if (pTrans == NULL) {
sdbRelease(pSdb, pStream);
sdbCancelFetch(pSdb, pIter);

View File

@ -66,7 +66,7 @@ static void addIntoCheckpointList(SArray* pList, const SFailedCheckpointInfo* pI
}
int32_t mndCreateStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) {
STrans *pTrans = doCreateTrans(pMnode, pStream, NULL, MND_STREAM_TASK_RESET_NAME, " reset from failed checkpoint");
STrans *pTrans = doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_TASK_RESET_NAME, " reset from failed checkpoint");
if (pTrans == NULL) {
return terrno;
}
@ -156,7 +156,7 @@ static int32_t mndDropOrphanTasks(SMnode* pMnode, SArray* pList) {
}
SStreamObj dummyObj = {.uid = pTask->streamId, .sourceDb = "", .targetSTbName = ""};
STrans* pTrans = doCreateTrans(pMnode, &dummyObj, NULL, MND_STREAM_DROP_NAME, "drop stream");
STrans* pTrans = doCreateTrans(pMnode, &dummyObj, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_DROP_NAME, "drop stream");
if (pTrans == NULL) {
mError("failed to create trans to drop orphan tasks since %s", terrstr());
return -1;

View File

@ -161,8 +161,8 @@ int32_t mndAddtoCheckpointWaitingList(SStreamObj* pStream, int64_t checkpointId)
return TSDB_CODE_SUCCESS;
}
STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, const char *name, const char *pMsg) {
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pReq, name);
STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, ETrnConflct conflict, const char *name, const char *pMsg) {
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, conflict, pReq, name);
if (pTrans == NULL) {
mError("failed to build trans:%s, reason: %s", name, tstrerror(TSDB_CODE_OUT_OF_MEMORY));
terrno = TSDB_CODE_OUT_OF_MEMORY;

View File

@ -969,7 +969,7 @@ static int32_t parseTagsClauseImpl(SInsertParseContext* pCxt, SVnodeModifyOpStmt
}
_exit:
for (int32_t i = 0; i < TARRAY_SIZE(pTagVals); ++i) {
for (int32_t i = 0; i < taosArrayGetSize(pTagVals); ++i) {
STagVal* p = (STagVal*)TARRAY_GET_ELEM(pTagVals, i);
if (IS_VAR_DATA_TYPE(p->type)) {
taosMemoryFreeClear(p->pData);

View File

@ -41,6 +41,12 @@
#define LOG_BUF_SIZE(x) ((x)->buffSize)
#define LOG_BUF_MUTEX(x) ((x)->buffMutex)
#ifdef TD_ENTERPRISE
#define LOG_EDITION_FLG ("E")
#else
#define LOG_EDITION_FLG ("C")
#endif
typedef struct {
char *buffer;
int32_t buffStart;
@ -490,8 +496,9 @@ static inline int32_t taosBuildLogHead(char *buffer, const char *flags) {
time_t curTime = timeSecs.tv_sec;
ptm = taosLocalTime(&curTime, &Tm, NULL);
return sprintf(buffer, "%02d/%02d %02d:%02d:%02d.%06d %08" PRId64 " %s", ptm->tm_mon + 1, ptm->tm_mday, ptm->tm_hour,
ptm->tm_min, ptm->tm_sec, (int32_t)timeSecs.tv_usec, taosGetSelfPthreadId(), flags);
return sprintf(buffer, "%02d/%02d %02d:%02d:%02d.%06d %08" PRId64 " %s %s", ptm->tm_mon + 1, ptm->tm_mday,
ptm->tm_hour, ptm->tm_min, ptm->tm_sec, (int32_t)timeSecs.tv_usec, taosGetSelfPthreadId(),
LOG_EDITION_FLG, flags);
}
static inline void taosPrintLogImp(ELogLevel level, int32_t dflag, const char *buffer, int32_t len) {