Merge branch '3.0' into enh/TS-3737-3.0

This commit is contained in:
kailixu 2024-06-07 14:09:23 +08:00
commit 054cb9b64a
31 changed files with 335 additions and 249 deletions

View File

@ -11,6 +11,10 @@ ELSE ()
SET(TD_VER_COMPATIBLE "3.0.0.0")
ENDIF ()
IF (TD_PRODUCT_NAME)
ADD_DEFINITIONS(-DTD_PRODUCT_NAME="${TD_PRODUCT_NAME}")
ENDIF ()
find_program(HAVE_GIT NAMES git)
IF (DEFINED GITINFO)

View File

@ -73,7 +73,7 @@ char *tGetMachineId();
#ifdef TD_ENTERPRISE
#define GRANTS_SCHEMA \
static const SSysDbTableSchema grantsSchema[] = { \
{.name = "version", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, \
{.name = "version", .bytes = 32 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, \
{.name = "expire_time", .bytes = 19 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, \
{.name = "service_time", .bytes = 19 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, \
{.name = "expired", .bytes = 5 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, \
@ -85,7 +85,7 @@ char *tGetMachineId();
#else
#define GRANTS_SCHEMA \
static const SSysDbTableSchema grantsSchema[] = { \
{.name = "version", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, \
{.name = "version", .bytes = 32 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, \
{.name = "expire_time", .bytes = 19 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, \
{.name = "service_time", .bytes = 19 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, \
{.name = "expired", .bytes = 5 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, \

View File

@ -224,13 +224,14 @@
TD_DEF_MSG_TYPE(TDMT_MND_CREATE_ENCRYPT_KEY, "create-encrypt-key", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_S3MIGRATE_DB, "s3migrate-db", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_S3MIGRATE_DB_TIMER, "s3migrate-db-tmr", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_MAX_MSG, "mnd-max", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_UNUSED2, "mnd-unused2", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_CREATE_TSMA, "create-tsma", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_DROP_TSMA, "drop-tsma", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_STB_DROP, "drop-stb", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_GET_TABLE_TSMA, "get-table-tsma", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_GET_TSMA, "get-tsma", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_DROP_TB_WITH_TSMA, "drop-tb-with-tsma", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_MAX_MSG, "mnd-max", NULL, NULL)
TD_CLOSE_MSG_SEG(TDMT_END_MND_MSG)
TD_NEW_MSG_SEG(TDMT_VND_MSG) // 2<<8
@ -275,7 +276,6 @@
TD_DEF_MSG_TYPE(TDMT_VND_ALTER_HASHRANGE, "alter-hashrange", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_COMPACT, "vnode-compact", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_DROP_TTL_TABLE, "vnode-drop-ttl-stb", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_FETCH_TTL_EXPIRED_TBS, "vnode-fetch-ttl-expired-tbs", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_TRIM, "vnode-trim", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_COMMIT, "vnode-commit", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_CREATE_INDEX, "vnode-create-index", NULL, NULL)
@ -286,6 +286,7 @@
TD_DEF_MSG_TYPE(TDMT_VND_S3MIGRATE, "vnode-s3migrate", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_ARB_HEARTBEAT, "vnode-arb-hb", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_ARB_CHECK_SYNC, "vnode-arb-check-sync", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_FETCH_TTL_EXPIRED_TBS, "vnode-fetch-ttl-expired-tbs", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_MAX_MSG, "vnd-max", NULL, NULL)
TD_CLOSE_MSG_SEG(TDMT_END_VND_MSG)
@ -318,11 +319,12 @@
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_PAUSE, "stream-task-pause", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_RESUME, "stream-task-resume", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_STOP, "stream-task-stop", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_MAX_MSG, "stream-max", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_UNUSED, "stream-unused", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_CREATE, "stream-create", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_DROP, "stream-drop", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_RETRIEVE_TRIGGER, "stream-retri-trigger", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_MAX_MSG, "stream-max", NULL, NULL)
TD_CLOSE_MSG_SEG(TDMT_END_STREAM_MSG)
TD_NEW_MSG_SEG(TDMT_MON_MSG) //5 << 8
@ -369,8 +371,9 @@
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TASK_UPDATE, "vnode-stream-update", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TASK_RESET, "vnode-stream-reset", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TASK_CHECK, "vnode-stream-task-check", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_MAX_MSG, "vnd-stream-max", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_UNUSED, "vnd-stream-unused", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_GET_STREAM_PROGRESS, "vnd-stream-progress", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_MAX_MSG, "vnd-stream-max", NULL, NULL)
TD_CLOSE_MSG_SEG(TDMT_END_VND_STREAM_MSG)
TD_NEW_MSG_SEG(TDMT_VND_TMQ_MSG) //8 << 8

View File

@ -78,6 +78,14 @@ extern const int32_t TYPE_BYTES[21];
#define TSDB_DEFAULT_PASS "taosdata"
#endif
#ifndef TD_PRODUCT_NAME
#ifdef TD_ENTERPRISE
#define TD_PRODUCT_NAME "TDengine Enterprise Edition"
#else
#define TD_PRODUCT_NAME "TDengine Community Edition"
#endif
#endif
#define TSDB_TRUE 1
#define TSDB_FALSE 0
#define TSDB_OK 0

View File

@ -826,7 +826,7 @@ void tmqSendHbReq(void* param, void* tmrId) {
offRows->vgId = pVg->vgId;
offRows->rows = pVg->numOfRows;
offRows->offset = pVg->offsetInfo.endOffset;
offRows->ever = pVg->offsetInfo.walVerEnd;
offRows->ever = pVg->offsetInfo.walVerEnd == -1 ? 0 : pVg->offsetInfo.walVerEnd;
char buf[TSDB_OFFSET_LEN] = {0};
tFormatOffset(buf, TSDB_OFFSET_LEN, &offRows->offset);
tscDebug("consumer:0x%" PRIx64 ",report offset, group:%s vgId:%d, offset:%s/%" PRId64 ", rows:%" PRId64,

View File

@ -258,17 +258,12 @@ static void dmPrintArgs(int32_t argc, char const *argv[]) {
static void dmGenerateGrant() { mndGenerateMachineCode(); }
static void dmPrintVersion() {
printf("%s\ntaosd version: %s compatible_version: %s\n", TD_PRODUCT_NAME, version, compatible_version);
printf("git: %s\n", gitinfo);
#ifdef TD_ENTERPRISE
char *releaseName = "enterprise";
#else
char *releaseName = "community";
printf("git: %s\n", gitinfoOfInternal);
#endif
printf("%s version: %s compatible_version: %s\n", releaseName, version, compatible_version);
printf("gitinfo: %s\n", gitinfo);
#ifdef TD_ENTERPRISE
printf("gitinfoOfInternal: %s\n", gitinfoOfInternal);
#endif
printf("buildInfo: %s\n", buildinfo);
printf("build: %s\n", buildinfo);
}
static void dmPrintHelp() {

View File

@ -106,8 +106,8 @@ int32_t mndStreamSetUpdateEpsetAction(SMnode *pMnode, SStreamObj *pStream, SVgr
SStreamObj *mndGetStreamObj(SMnode *pMnode, int64_t streamId);
int32_t extractNodeEpset(SMnode *pMnode, SEpSet *pEpSet, bool *hasEpset, int32_t taskId, int32_t nodeId);
int32_t mndProcessStreamHb(SRpcMsg *pReq);
void saveStreamTasksInfo(SStreamObj *pStream, SStreamExecInfo *pExecNode);
int32_t initStreamNodeList(SMnode *pMnode);
void saveTaskAndNodeInfoIntoBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode);
int32_t extractStreamNodeList(SMnode *pMnode);
int32_t mndStreamSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamObj *pStream, int8_t igUntreated);
int32_t mndStreamSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
int32_t mndStreamSetDropAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
@ -121,6 +121,7 @@ bool streamTaskIterNextTask(SStreamTaskIter *pIter);
SStreamTask *streamTaskIterGetCurrent(SStreamTaskIter *pIter);
void mndInitExecInfo();
void removeExpiredNodeInfo(const SArray *pNodeSnapshot);
void removeTasksInBuf(SArray* pTaskIds, SStreamExecInfo* pExecInfo);
#ifdef __cplusplus
}

View File

@ -36,8 +36,7 @@ static int32_t mndRetrieveGrant(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBl
if (pShow->numOfRows < 1) {
cols = 0;
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols);
const char *src = "community";
STR_WITH_MAXSIZE_TO_VARSTR(tmp, src, 32);
STR_WITH_MAXSIZE_TO_VARSTR(tmp, TD_PRODUCT_NAME, 32);
colDataSetVal(pColInfo, numOfRows, tmp, false);
GRANT_ITEM_SHOW("unlimited");

View File

@ -56,13 +56,13 @@ static int32_t mndBuildStreamCheckpointSourceReq(void **pBuf, int32_t *pLen, int
int64_t streamId, int32_t taskId, int32_t transId, int8_t mndTrigger);
static int32_t mndProcessNodeCheck(SRpcMsg *pReq);
static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg);
static SArray *extractNodeListFromStream(SMnode *pMnode);
static SArray *extractNodeListFromStream(SMnode *pMnode, SArray* pNodeList);
static int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq);
static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList);
static void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode);
static int32_t removeExpiredNodeEntryAndTask(SArray *pNodeSnapshot);
void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode);
static int32_t removeExpiredNodeEntryAndTaskInBuf(SArray *pNodeSnapshot);
static int32_t doKillCheckpointTrans(SMnode *pMnode, const char *pDbName, size_t len);
static SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw);
@ -792,7 +792,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
// to make sure when the hb from vnode arrived, the newly created tasks have been in the task map already.
taosThreadMutexLock(&execInfo.lock);
mDebug("stream stream:%s start to register tasks into task nodeList", createReq.name);
saveStreamTasksInfo(&streamObj, &execInfo);
saveTaskAndNodeInfoIntoBuf(&streamObj, &execInfo);
taosThreadMutexUnlock(&execInfo.lock);
// execute creation
@ -815,7 +815,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
auditRecord(pReq, pMnode->clusterId, "createStream", dbname.dbname, name.dbname, sql, sqlLen);
} else {
char detail[1000] = {0};
sprintf(detail, "dbname:%s, stream name:%s", dbname.dbname, name.dbname);
snprintf(detail, tListLen(detail), "dbname:%s, stream name:%s", dbname.dbname, name.dbname);
auditRecord(pReq, pMnode->clusterId, "createStream", dbname.dbname, name.dbname, detail, strlen(detail));
}
@ -827,6 +827,7 @@ _OVER:
mndReleaseStream(pMnode, pStream);
tFreeSCMCreateStreamReq(&createReq);
tFreeStreamObj(&streamObj);
if (sql != NULL) {
taosMemoryFreeClear(sql);
}
@ -839,6 +840,7 @@ int64_t mndStreamGenChkptId(SMnode *pMnode, bool lock) {
void *pIter = NULL;
SSdb *pSdb = pMnode->pSdb;
int64_t maxChkptId = 0;
while (1) {
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
if (pIter == NULL) break;
@ -1031,10 +1033,9 @@ _ERR:
return code;
}
int32_t initStreamNodeList(SMnode *pMnode) {
int32_t extractStreamNodeList(SMnode *pMnode) {
if (taosArrayGetSize(execInfo.pNodeList) == 0) {
execInfo.pNodeList = taosArrayDestroy(execInfo.pNodeList);
execInfo.pNodeList = extractNodeListFromStream(pMnode);
extractNodeListFromStream(pMnode, execInfo.pNodeList);
}
return taosArrayGetSize(execInfo.pNodeList);
@ -1044,7 +1045,7 @@ static bool taskNodeIsUpdated(SMnode *pMnode) {
// check if the node update happens or not
taosThreadMutexLock(&execInfo.lock);
int32_t numOfNodes = initStreamNodeList(pMnode);
int32_t numOfNodes = extractStreamNodeList(pMnode);
if (numOfNodes == 0) {
mDebug("stream task node change checking done, no vgroups exist, do nothing");
execInfo.ts = taosGetTimestampSec();
@ -1071,22 +1072,23 @@ static bool taskNodeIsUpdated(SMnode *pMnode) {
}
SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execInfo.pNodeList, pNodeSnapshot);
bool nodeUpdated = (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0);
taosArrayDestroy(changeInfo.pUpdateNodeList);
taosHashCleanup(changeInfo.pDBMap);
taosArrayDestroy(pNodeSnapshot);
if (nodeUpdated) {
mDebug("stream task not ready due to node update");
mDebug("stream tasks not ready due to node update");
}
taosThreadMutexUnlock(&execInfo.lock);
return nodeUpdated;
}
static int32_t mndCheckNodeStatus(SMnode *pMnode) {
static int32_t mndCheckTaskAndNodeStatus(SMnode *pMnode) {
bool ready = true;
int64_t ts = taosGetTimestampSec();
if (taskNodeIsUpdated(pMnode)) {
return -1;
}
@ -1094,7 +1096,7 @@ static int32_t mndCheckNodeStatus(SMnode *pMnode) {
taosThreadMutexLock(&execInfo.lock);
if (taosArrayGetSize(execInfo.pNodeList) == 0) {
mDebug("stream task node change checking done, no vgroups exist, do nothing");
execInfo.ts = ts;
ASSERT(taosArrayGetSize(execInfo.pTaskList) == 0);
}
for (int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) {
@ -1148,12 +1150,13 @@ static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq) {
int32_t code = 0;
int32_t numOfCheckpointTrans = 0;
if ((code = mndCheckNodeStatus(pMnode)) != 0) {
if ((code = mndCheckTaskAndNodeStatus(pMnode)) != 0) {
return code;
}
SArray* pList = taosArrayInit(4, sizeof(SCheckpointInterval));
int64_t now = taosGetTimestampMs();
while ((pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream)) != NULL) {
int64_t duration = now - pStream->checkpointFreq;
if (duration < tsStreamCheckpointInterval * 1000) {
@ -1572,7 +1575,9 @@ static int32_t setTaskAttrInResBlock(SStreamObj *pStream, SStreamTask *pTask, SS
STaskStatusEntry *pe = taosHashGet(execInfo.pTaskMap, &id, sizeof(id));
if (pe == NULL) {
mError("task:0x%" PRIx64 " not exists in vnode, no valid status/stage info", id.taskId);
mError("task:0x%" PRIx64 " not exists in any vnodes, streamName:%s, streamId:0x%" PRIx64 " createTs:%" PRId64
" no valid status/stage info",
id.taskId, pStream->name, pStream->uid, pStream->createTime);
return -1;
}
@ -2164,7 +2169,7 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange
return 0;
}
static SArray *extractNodeListFromStream(SMnode *pMnode) {
static SArray *extractNodeListFromStream(SMnode *pMnode, SArray* pNodeList) {
SSdb *pSdb = pMnode->pSdb;
SStreamObj *pStream = NULL;
void *pIter = NULL;
@ -2193,13 +2198,13 @@ static SArray *extractNodeListFromStream(SMnode *pMnode) {
sdbRelease(pSdb, pStream);
}
SArray *plist = taosArrayInit(taosHashGetSize(pHash), sizeof(SNodeEntry));
taosArrayClear(pNodeList);
// convert to list
pIter = NULL;
while ((pIter = taosHashIterate(pHash, pIter)) != NULL) {
SNodeEntry *pEntry = (SNodeEntry *)pIter;
taosArrayPush(plist, pEntry);
taosArrayPush(pNodeList, pEntry);
char buf[256] = {0};
epsetToStr(&pEntry->epset, buf, tListLen(buf));
@ -2207,27 +2212,6 @@ static SArray *extractNodeListFromStream(SMnode *pMnode) {
}
taosHashCleanup(pHash);
return plist;
}
static int32_t doRemoveTasks(SStreamExecInfo *pExecNode, STaskId *pRemovedId) {
void *p = taosHashGet(pExecNode->pTaskMap, pRemovedId, sizeof(*pRemovedId));
if (p == NULL) {
return TSDB_CODE_SUCCESS;
}
taosHashRemove(pExecNode->pTaskMap, pRemovedId, sizeof(*pRemovedId));
for (int32_t k = 0; k < taosArrayGetSize(pExecNode->pTaskList); ++k) {
STaskId *pId = taosArrayGet(pExecNode->pTaskList, k);
if (pId->taskId == pRemovedId->taskId && pId->streamId == pRemovedId->streamId) {
taosArrayRemove(pExecNode->pTaskList, k);
int32_t num = taosArrayGetSize(pExecNode->pTaskList);
mInfo("s-task:0x%x removed from buffer, remain:%d", (int32_t)pRemovedId->taskId, num);
break;
}
}
return TSDB_CODE_SUCCESS;
}
@ -2244,7 +2228,7 @@ static bool taskNodeExists(SArray *pList, int32_t nodeId) {
return false;
}
int32_t removeExpiredNodeEntryAndTask(SArray *pNodeSnapshot) {
int32_t removeExpiredNodeEntryAndTaskInBuf(SArray *pNodeSnapshot) {
SArray *pRemovedTasks = taosArrayInit(4, sizeof(STaskId));
int32_t numOfTask = taosArrayGetSize(execInfo.pTaskList);
@ -2262,10 +2246,7 @@ int32_t removeExpiredNodeEntryAndTask(SArray *pNodeSnapshot) {
}
}
for (int32_t i = 0; i < taosArrayGetSize(pRemovedTasks); ++i) {
STaskId *pId = taosArrayGet(pRemovedTasks, i);
doRemoveTasks(&execInfo, pId);
}
removeTasksInBuf(pRemovedTasks, &execInfo);
mDebug("remove invalid stream tasks:%d, remain:%d", (int32_t)taosArrayGetSize(pRemovedTasks),
(int32_t)taosArrayGetSize(execInfo.pTaskList));
@ -2292,7 +2273,7 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
SMnode *pMnode = pMsg->info.node;
taosThreadMutexLock(&execInfo.lock);
int32_t numOfNodes = initStreamNodeList(pMnode);
int32_t numOfNodes = extractStreamNodeList(pMnode);
taosThreadMutexUnlock(&execInfo.lock);
if (numOfNodes == 0) {
@ -2312,7 +2293,8 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
}
taosThreadMutexLock(&execInfo.lock);
removeExpiredNodeEntryAndTask(pNodeSnapshot);
removeExpiredNodeEntryAndTaskInBuf(pNodeSnapshot);
SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execInfo.pNodeList, pNodeSnapshot);
if (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0) {
@ -2322,8 +2304,7 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
// keep the new vnode snapshot if success
if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) {
taosArrayDestroy(execInfo.pNodeList);
execInfo.pNodeList = extractNodeListFromStream(pMnode);
extractNodeListFromStream(pMnode, execInfo.pNodeList);
execInfo.ts = ts;
mDebug("create trans successfully, update cached node list, numOfNodes:%d",
(int)taosArrayGetSize(execInfo.pNodeList));
@ -2360,7 +2341,7 @@ static int32_t mndProcessNodeCheck(SRpcMsg *pReq) {
return 0;
}
void saveStreamTasksInfo(SStreamObj *pStream, SStreamExecInfo *pExecNode) {
void saveTaskAndNodeInfoIntoBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) {
SStreamTaskIter *pIter = createStreamTaskIter(pStream);
while (streamTaskIterNextTask(pIter)) {
SStreamTask *pTask = streamTaskIterGetCurrent(pIter);
@ -2400,37 +2381,6 @@ void saveStreamTasksInfo(SStreamObj *pStream, SStreamExecInfo *pExecNode) {
destroyStreamTaskIter(pIter);
}
void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) {
taosThreadMutexLock(&pExecNode->lock);
SStreamTaskIter *pIter = createStreamTaskIter(pStream);
while (streamTaskIterNextTask(pIter)) {
SStreamTask *pTask = streamTaskIterGetCurrent(pIter);
STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
void *p = taosHashGet(pExecNode->pTaskMap, &id, sizeof(id));
if (p != NULL) {
taosHashRemove(pExecNode->pTaskMap, &id, sizeof(id));
for (int32_t k = 0; k < taosArrayGetSize(pExecNode->pTaskList); ++k) {
STaskId *pId = taosArrayGet(pExecNode->pTaskList, k);
if (pId->taskId == id.taskId && pId->streamId == id.streamId) {
taosArrayRemove(pExecNode->pTaskList, k);
int32_t num = taosArrayGetSize(pExecNode->pTaskList);
mInfo("s-task:0x%x removed from buffer, remain:%d", (int32_t)id.taskId, num);
break;
}
}
}
}
ASSERT(taosHashGetSize(pExecNode->pTaskMap) == taosArrayGetSize(pExecNode->pTaskList));
taosThreadMutexUnlock(&pExecNode->lock);
destroyStreamTaskIter(pIter);
}
static void doAddTaskId(SArray *pList, int32_t taskId, int64_t uid, int32_t numOfTotal) {
int32_t num = taosArrayGetSize(pList);
for (int32_t i = 0; i < num; ++i) {

View File

@ -22,7 +22,7 @@ typedef struct SFailedCheckpointInfo {
int32_t transId;
} SFailedCheckpointInfo;
static void doExtractTasksFromStream(SMnode *pMnode) {
static void addAllStreamTasksIntoBuf(SMnode *pMnode, SStreamExecInfo* pExecInfo) {
SSdb *pSdb = pMnode->pSdb;
SStreamObj *pStream = NULL;
void *pIter = NULL;
@ -33,11 +33,44 @@ static void doExtractTasksFromStream(SMnode *pMnode) {
break;
}
saveStreamTasksInfo(pStream, &execInfo);
saveTaskAndNodeInfoIntoBuf(pStream, pExecInfo);
sdbRelease(pSdb, pStream);
}
}
static void removeDroppedStreamTasksInBuf(SMnode *pMnode, SStreamExecInfo *pExecInfo) {
if (pMnode == NULL) {
return;
}
int32_t num = taosArrayGetSize(pExecInfo->pTaskList);
SHashObj *pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
SArray *pIdList = taosArrayInit(4, sizeof(STaskId));
for (int32_t i = 0; i < num; ++i) {
STaskId* pId = taosArrayGet(pExecInfo->pTaskList, i);
void* p = taosHashGet(pHash, &pId->streamId, sizeof(int64_t));
if (p != NULL) {
continue;
}
void* pObj = mndGetStreamObj(pMnode, pId->streamId);
if (pObj != NULL) {
mndReleaseStream(pMnode, pObj);
taosHashPut(pHash, &pId->streamId, sizeof(int64_t), NULL, 0);
} else {
taosArrayPush(pIdList, pId);
}
}
removeTasksInBuf(pIdList, &execInfo);
taosArrayDestroy(pIdList);
taosHashCleanup(pHash);
}
static void updateStageInfo(STaskStatusEntry *pTaskEntry, int64_t stage) {
int32_t numOfNodes = taosArrayGetSize(execInfo.pNodeList);
for (int32_t j = 0; j < numOfNodes; ++j) {
@ -230,7 +263,7 @@ int32_t suspendAllStreams(SMnode *pMnode, SRpcHandleInfo* info){
int32_t mndProcessStreamHb(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SStreamHbMsg req = {0};
SArray *pFailedTasks = NULL;
SArray *pFailedChkpt = NULL;
SArray *pOrphanTasks = NULL;
if ((terrno = grantCheckExpire(TSDB_GRANT_STREAMS)) < 0) {
@ -252,17 +285,21 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
mTrace("receive stream-meta hb from vgId:%d, active numOfTasks:%d", req.vgId, req.numOfTasks);
pFailedTasks = taosArrayInit(4, sizeof(SFailedCheckpointInfo));
pOrphanTasks = taosArrayInit(3, sizeof(SOrphanTask));
pFailedChkpt = taosArrayInit(4, sizeof(SFailedCheckpointInfo));
pOrphanTasks = taosArrayInit(4, sizeof(SOrphanTask));
taosThreadMutexLock(&execInfo.lock);
// extract stream task list
if (taosHashGetSize(execInfo.pTaskMap) == 0) {
doExtractTasksFromStream(pMnode);
addAllStreamTasksIntoBuf(pMnode, &execInfo);
} else {
// the already dropped tasks may be added by hb from vnode at the time when the pTaskMap happens to be empty.
// let's drop them here.
removeDroppedStreamTasksInBuf(pMnode, &execInfo);
}
initStreamNodeList(pMnode);
extractStreamNodeList(pMnode);
int32_t numOfUpdated = taosArrayGetSize(req.pUpdateNodes);
if (numOfUpdated > 0) {
@ -290,16 +327,16 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
}
} else {
// task is idle for more than 50 sec.
if (fabs(pTaskEntry->inputQUsed - p->inputQUsed) <= DBL_EPSILON) {
if (!pTaskEntry->inputQChanging) {
pTaskEntry->inputQUnchangeCounter++;
} else {
pTaskEntry->inputQChanging = false;
}
} else {
pTaskEntry->inputQChanging = true;
pTaskEntry->inputQUnchangeCounter = 0;
}
// if (fabs(pTaskEntry->inputQUsed - p->inputQUsed) <= DBL_EPSILON) {
// if (!pTaskEntry->inputQChanging) {
// pTaskEntry->inputQUnchangeCounter++;
// } else {
// pTaskEntry->inputQChanging = false;
// }
// } else {
// pTaskEntry->inputQChanging = true;
// pTaskEntry->inputQUnchangeCounter = 0;
// }
streamTaskStatusCopy(pTaskEntry, p);
@ -310,7 +347,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
SFailedCheckpointInfo info = {
.transId = pChkInfo->activeTransId, .checkpointId = pChkInfo->activeId, .streamUid = p->id.streamId};
addIntoCheckpointList(pFailedTasks, &info);
addIntoCheckpointList(pFailedChkpt, &info);
}
}
@ -328,7 +365,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
// current checkpoint is failed, rollback from the checkpoint trans
// kill the checkpoint trans and then set all tasks status to be normal
if (taosArrayGetSize(pFailedTasks) > 0) {
if (taosArrayGetSize(pFailedChkpt) > 0) {
bool allReady = true;
if (pMnode != NULL) {
SArray *p = mndTakeVgroupSnapshot(pMnode, &allReady);
@ -339,8 +376,8 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
if (allReady || snodeChanged) {
// if the execInfo.activeCheckpoint == 0, the checkpoint is restoring from wal
for(int32_t i = 0; i < taosArrayGetSize(pFailedTasks); ++i) {
SFailedCheckpointInfo *pInfo = taosArrayGet(pFailedTasks, i);
for(int32_t i = 0; i < taosArrayGetSize(pFailedChkpt); ++i) {
SFailedCheckpointInfo *pInfo = taosArrayGet(pFailedChkpt, i);
mInfo("checkpointId:%" PRId64 " transId:%d failed, issue task-reset trans to reset all tasks status",
pInfo->checkpointId, pInfo->transId);
@ -359,7 +396,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
taosThreadMutexUnlock(&execInfo.lock);
tCleanupStreamHbMsg(&req);
taosArrayDestroy(pFailedTasks);
taosArrayDestroy(pFailedChkpt);
taosArrayDestroy(pOrphanTasks);
{

View File

@ -26,6 +26,8 @@ struct SStreamTaskIter {
SStreamTask *pTask;
};
int32_t doRemoveTasks(SStreamExecInfo *pExecNode, STaskId *pRemovedId);
SStreamTaskIter* createStreamTaskIter(SStreamObj* pStream) {
SStreamTaskIter* pIter = taosMemoryCalloc(1, sizeof(SStreamTaskIter));
if (pIter == NULL) {
@ -598,3 +600,49 @@ void removeExpiredNodeInfo(const SArray *pNodeSnapshot) {
mDebug("remain %d valid node entries after clean expired nodes info", (int32_t)taosArrayGetSize(pValidList));
}
int32_t doRemoveTasks(SStreamExecInfo *pExecNode, STaskId *pRemovedId) {
void *p = taosHashGet(pExecNode->pTaskMap, pRemovedId, sizeof(*pRemovedId));
if (p == NULL) {
return TSDB_CODE_SUCCESS;
}
taosHashRemove(pExecNode->pTaskMap, pRemovedId, sizeof(*pRemovedId));
for (int32_t k = 0; k < taosArrayGetSize(pExecNode->pTaskList); ++k) {
STaskId *pId = taosArrayGet(pExecNode->pTaskList, k);
if (pId->taskId == pRemovedId->taskId && pId->streamId == pRemovedId->streamId) {
taosArrayRemove(pExecNode->pTaskList, k);
int32_t num = taosArrayGetSize(pExecNode->pTaskList);
mInfo("s-task:0x%x removed from buffer, remain:%d in buffer list", (int32_t)pRemovedId->taskId, num);
break;
}
}
return TSDB_CODE_SUCCESS;
}
void removeTasksInBuf(SArray *pTaskIds, SStreamExecInfo* pExecInfo) {
for (int32_t i = 0; i < taosArrayGetSize(pTaskIds); ++i) {
STaskId *pId = taosArrayGet(pTaskIds, i);
doRemoveTasks(pExecInfo, pId);
}
}
void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) {
taosThreadMutexLock(&pExecNode->lock);
SStreamTaskIter *pIter = createStreamTaskIter(pStream);
while (streamTaskIterNextTask(pIter)) {
SStreamTask *pTask = streamTaskIterGetCurrent(pIter);
STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
doRemoveTasks(pExecNode, &id);
}
ASSERT(taosHashGetSize(pExecNode->pTaskMap) == taosArrayGetSize(pExecNode->pTaskList));
taosThreadMutexUnlock(&pExecNode->lock);
destroyStreamTaskIter(pIter);
}

View File

@ -138,7 +138,6 @@ void initStreamExecInfo() {
}
void initNodeInfo() {
execInfo.pNodeList = taosArrayInit(4, sizeof(SNodeEntry));
SNodeEntry entry = {0};
entry.nodeId = 2;
entry.stageUpdated = true;
@ -207,27 +206,32 @@ TEST_F(StreamTest, kill_checkpoint_trans) {
killAllCheckpointTrans(pMnode, &info);
SStreamObj stream;
memset(&stream, 0, sizeof(SStreamObj));
void* p = alloca(sizeof(SStreamObj) + sizeof(SSdbRow));
SSdbRow* pRow = static_cast<SSdbRow*>(p);
pRow->type = SDB_MAX;
stream.uid = defStreamId;
stream.lock = 0;
stream.tasks = taosArrayInit(1, POINTER_BYTES);
stream.pHTasksList = taosArrayInit(1, POINTER_BYTES);
SStreamObj* pStream = (SStreamObj*)((char*)p + sizeof(SSdbRow));
memset(pStream, 0, sizeof(SStreamObj));
pStream->uid = defStreamId;
pStream->lock = 0;
pStream->tasks = taosArrayInit(1, POINTER_BYTES);
pStream->pHTasksList = taosArrayInit(1, POINTER_BYTES);
SArray* pLevel = taosArrayInit(1, POINTER_BYTES);
SStreamTask* pTask = static_cast<SStreamTask*>(taosMemoryCalloc(1, sizeof(SStreamTask)));
pTask->id.streamId = defStreamId;
pTask->id.taskId = 1;
pTask->exec.qmsg = (char*)taosMemoryMalloc(1);
pTask->exec.qmsg = (char*)taosMemoryCalloc(1,1);
taosThreadMutexInit(&pTask->lock, NULL);
taosArrayPush(pLevel, &pTask);
taosArrayPush(stream.tasks, &pLevel);
mndCreateStreamResetStatusTrans(pMnode, &stream);
taosArrayPush(pStream->tasks, &pLevel);
mndCreateStreamResetStatusTrans(pMnode, pStream);
tFreeStreamObj(&stream);
tFreeStreamObj(pStream);
sdbCleanup(pMnode->pSdb);
taosMemoryFree(pMnode);

View File

@ -162,7 +162,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
}
pInfo->blockType = STREAM_INPUT__DATA_BLOCK;
} else if (type == STREAM_INPUT__CHECKPOINT) {
} else if (type == STREAM_INPUT__CHECKPOINT_TRIGGER) {
SPackedData tmp = {.pDataBlock = input};
taosArrayPush(pInfo->pBlockLists, &tmp);
pInfo->blockType = STREAM_INPUT__CHECKPOINT;

View File

@ -989,13 +989,15 @@ void tsortAppendTupleToBlock(SSortHandle* pHandle, SSDataBlock* pBlock, STupleHa
colDataSetNULL(pColInfo, pBlock->info.rows);
}
}
if (bFreeRow) {
taosMemoryFree(buf);
}
if (*(int32_t*)pStart != pStart - buf) {
qError("table merge scan row buf deserialization. length error %d != %d ", *(int32_t*)pStart,
(int32_t)(pStart - buf));
};
}
if (bFreeRow) {
taosMemoryFree(buf);
}
pBlock->info.dataLoad = 1;
pBlock->info.scanFlag = ((SDataBlockInfo*)tsortGetBlockInfo(pTupleHandle))->scanFlag;

View File

@ -1242,14 +1242,9 @@ static int32_t udfdParseArgs(int32_t argc, char *argv[]) {
}
static void udfdPrintVersion() {
#ifdef TD_ENTERPRISE
char *releaseName = "enterprise";
#else
char *releaseName = "community";
#endif
printf("%s version: %s compatible_version: %s\n", releaseName, version, compatible_version);
printf("gitinfo: %s\n", gitinfo);
printf("buildInfo: %s\n", buildinfo);
printf("udfd version: %s compatible_version: %s\n", version, compatible_version);
printf("git: %s\n", gitinfo);
printf("build: %s\n", buildinfo);
}
static int32_t udfdInitLog() {

View File

@ -454,7 +454,12 @@ static int32_t scanPathOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSub
scanPathOptSetGroupOrderScan(info.pScan);
}
if (TSDB_CODE_SUCCESS == code && (NULL != info.pDsoFuncs || NULL != info.pSdrFuncs)) {
if (pCxt->pPlanCxt->streamQuery) {
info.pScan->dataRequired = FUNC_DATA_REQUIRED_DATA_LOAD; // always load all data for stream query
} else {
info.pScan->dataRequired = scanPathOptGetDataRequired(info.pSdrFuncs);
}
info.pScan->pDynamicScanFuncs = info.pDsoFuncs;
}
if (TSDB_CODE_SUCCESS == code && info.pScan) {

View File

@ -224,6 +224,8 @@ int32_t initCheckpointReadyMsg(SStreamTask* pTask, int32_t upstreamNodeId, int32
typedef int32_t (*__stream_async_exec_fn_t)(void* param);
int32_t streamMetaAsyncExec(SStreamMeta* pMeta, __stream_async_exec_fn_t fn, void* param, int32_t* code);
void flushStateDataInExecutor(SStreamTask* pTask, SStreamQueueItem* pCheckpointBlock);
#ifdef __cplusplus
}
#endif

View File

@ -195,12 +195,6 @@ int32_t getCfIdx(const char* cfName) {
bool isValidCheckpoint(const char* dir) {
return true;
STaskDbWrapper* pDb = taskDbOpenImpl(NULL, NULL, (char*)dir);
if (pDb == NULL) {
return false;
}
taskDbDestroy(pDb, false);
return true;
}
int32_t rebuildDirFromCheckpoint(const char* path, int64_t chkpId, char** dst) {

View File

@ -276,6 +276,12 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
int8_t type = pTask->outputInfo.type;
pActiveInfo->allUpstreamTriggerRecv = 1;
// We need to transfer state here, before dispatching checkpoint-trigger to downstream tasks.
// The transfer of state may generate new data that need to dispatch to downstream tasks,
// Otherwise, those new generated data by executors that is kept in outputQ, may be lost if this program crashed
// before the next checkpoint.
flushStateDataInExecutor(pTask, (SStreamQueueItem*)pBlock);
if (type == TASK_OUTPUT__FIXED_DISPATCH || type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
stDebug("s-task:%s set childIdx:%d, and add checkpoint-trigger block into outputQ", id, pTask->info.selfChildId);
continueDispatchCheckpointTriggerBlock(pBlock, pTask);
@ -306,8 +312,11 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
streamTaskBuildCheckpoint(pTask);
} else { // source & agg tasks need to forward the checkpoint msg downwards
stDebug("s-task:%s process checkpoint-trigger block, all %d upstreams sent, forwards to downstream", id, num);
// Put the checkpoint block into inputQ, to make sure all blocks with less version have been handled by this task
// already. And then, dispatch check point msg to all downstream tasks
flushStateDataInExecutor(pTask, (SStreamQueueItem*)pBlock);
// Put the checkpoint-trigger block into outputQ, to make sure all blocks with less version have been handled by
// this task already. And then, dispatch check point msg to all downstream tasks
code = continueDispatchCheckpointTriggerBlock(pBlock, pTask);
}
}
@ -507,7 +516,7 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, SVUpdateCheckpoin
if (pReq->dropRelHTask) {
streamMetaUnregisterTask(pMeta, pReq->hStreamId, pReq->hTaskId);
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
stDebug("s-task:%s vgId:%d related fill-history task:0x%x dropped, remain tasks:%d", id, vgId, pReq->taskId, numOfTasks);
stDebug("s-task:%s vgId:%d related fill-history task:0x%x dropped, remain tasks:%d", id, vgId, (int32_t) pReq->hTaskId, numOfTasks);
}
streamMetaWLock(pMeta);
@ -526,6 +535,8 @@ void streamTaskSetFailedCheckpointId(SStreamTask* pTask) {
}
static int32_t getCheckpointDataMeta(const char* id, const char* path, SArray* list) {
char buf[128] = {0};
char* file = taosMemoryCalloc(1, strlen(path) + 32);
sprintf(file, "%s%s%s", path, TD_DIRSEP, "META_TMP");
@ -537,12 +548,17 @@ static int32_t getCheckpointDataMeta(const char* id, const char* path, SArray* l
}
TdFilePtr pFile = taosOpenFile(file, TD_FILE_READ);
char buf[128] = {0};
if (pFile == NULL) {
stError("%s failed to open meta file:%s for checkpoint", id, file);
code = -1;
return code;
}
if (taosReadFile(pFile, buf, sizeof(buf)) <= 0) {
stError("chkp failed to read meta file:%s", file);
stError("%s failed to read meta file:%s for checkpoint", id, file);
code = -1;
} else {
int32_t len = strlen(buf);
int32_t len = strnlen(buf, tListLen(buf));
for (int i = 0; i < len; i++) {
if (buf[i] == '\n') {
char* item = taosMemoryCalloc(1, i + 1);

View File

@ -246,6 +246,7 @@ static int32_t doBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* pD
int32_t downstreamTaskId = pTask->outputInfo.fixedDispatcher.taskId;
code = tInitStreamDispatchReq(pReq, pTask, pData->srcVgId, numOfBlocks, downstreamTaskId, pData->type);
if (code != TSDB_CODE_SUCCESS) {
taosMemoryFree(pReq);
return code;
}
@ -730,8 +731,8 @@ static void checkpointReadyMsgSendMonitorFn(void* param, void* tmrId) {
} else {
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug(
"s-task:%s vgId:%d recv of checkpoint-ready msg confirmed by all upstream task(s), quit from timer and clear "
"checkpoint-ready msg, ref:%d",
"s-task:%s vgId:%d recv of checkpoint-ready msg confirmed by all upstream task(s), clear checkpoint-ready msg "
"and quit from timer, ref:%d",
id, vgId, ref);
streamClearChkptReadyMsg(pTask);

View File

@ -24,6 +24,7 @@
#define FILL_HISTORY_TASK_EXEC_INTERVAL 5000 // 5 sec
static int32_t streamTransferStateDoPrepare(SStreamTask* pTask);
static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* totalSize, int32_t* totalBlocks);
bool streamTaskShouldStop(const SStreamTask* pTask) {
SStreamTaskState* pState = streamTaskGetStatus(pTask);
@ -87,8 +88,7 @@ static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray*
return code;
}
static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* totalSize,
int32_t* totalBlocks) {
int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* totalSize, int32_t* totalBlocks) {
int32_t code = TSDB_CODE_SUCCESS;
void* pExecutor = pTask->exec.pExecutor;
@ -541,6 +541,75 @@ int32_t streamProcessTransstateBlock(SStreamTask* pTask, SStreamDataBlock* pBloc
//static void streamTaskSetIdleInfo(SStreamTask* pTask, int32_t idleTime) { pTask->status.schedIdleTime = idleTime; }
static void setLastExecTs(SStreamTask* pTask, int64_t ts) { pTask->status.lastExecTs = ts; }
static void doStreamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pBlock) {
const char* id = pTask->id.idStr;
int32_t blockSize = 0;
int64_t st = taosGetTimestampMs();
SCheckpointInfo* pInfo = &pTask->chkInfo;
int64_t ver = pInfo->processedVer;
stDebug("s-task:%s start to process batch of blocks, num:%d, type:%s", id, 1, "checkpoint-trigger");
doSetStreamInputBlock(pTask, pBlock, &ver, id);
int64_t totalSize = 0;
int32_t totalBlocks = 0;
streamTaskExecImpl(pTask, pBlock, &totalSize, &totalBlocks);
double el = (taosGetTimestampMs() - st) / 1000.0;
stDebug("s-task:%s batch of input blocks exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%d", id, el,
SIZE_IN_MiB(totalSize), totalBlocks);
pTask->execInfo.outputDataBlocks += totalBlocks;
pTask->execInfo.outputDataSize += totalSize;
if (fabs(el - 0.0) <= DBL_EPSILON) {
pTask->execInfo.procsThroughput = 0;
pTask->execInfo.outputThroughput = 0;
} else {
pTask->execInfo.outputThroughput = (totalSize / el);
pTask->execInfo.procsThroughput = (blockSize / el);
}
// update the currentVer if processing the submit blocks.
ASSERT(pInfo->checkpointVer <= pInfo->nextProcessVer && ver >= pInfo->checkpointVer);
if (ver != pInfo->processedVer) {
stDebug("s-task:%s update processedVer(unsaved) from %" PRId64 " to %" PRId64 " nextProcessVer:%" PRId64
" ckpt:%" PRId64,
id, pInfo->processedVer, ver, pInfo->nextProcessVer, pInfo->checkpointVer);
pInfo->processedVer = ver;
}
}
void flushStateDataInExecutor(SStreamTask* pTask, SStreamQueueItem* pCheckpointBlock) {
const char* id = pTask->id.idStr;
// 1. transfer the ownership of executor state
bool dropRelHTask = (streamTaskGetPrevStatus(pTask) == TASK_STATUS__HALT);
if (dropRelHTask) {
ASSERT(HAS_RELATED_FILLHISTORY_TASK(pTask));
STaskId* pHTaskId = &pTask->hTaskInfo.id;
SStreamTask* pHTask = streamMetaAcquireTask(pTask->pMeta, pHTaskId->streamId, pHTaskId->taskId);
if (pHTask != NULL) {
streamTaskReleaseState(pHTask);
streamTaskReloadState(pTask);
stDebug("s-task:%s transfer state from fill-history task:%s, status:%s completed", id, pHTask->id.idStr,
streamTaskGetStatus(pHTask)->name);
streamMetaReleaseTask(pTask->pMeta, pHTask);
} else {
stError("s-task:%s related fill-history task:0x%x failed to acquire, transfer state failed", id,
(int32_t)pHTaskId->taskId);
}
} else {
stDebug("s-task:%s no transfer-state needed", id);
}
// 2. flush data in executor to K/V store, which should be completed before do checkpoint in the K/V.
doStreamTaskExecImpl(pTask, pCheckpointBlock);
}
/**
* todo: the batch of blocks should be tuned dynamic, according to the total elapsed time of each batch of blocks, the
* appropriate batch of blocks should be handled in 5 to 10 sec.
@ -628,63 +697,8 @@ static int32_t doStreamExecTask(SStreamTask* pTask) {
}
}
if (type == STREAM_INPUT__CHECKPOINT) {
// transfer the state from fill-history to related stream task before generating the checkpoint.
bool dropRelHTask = (streamTaskGetPrevStatus(pTask) == TASK_STATUS__HALT);
if (dropRelHTask) {
ASSERT(HAS_RELATED_FILLHISTORY_TASK(pTask));
STaskId* pHTaskId = &pTask->hTaskInfo.id;
SStreamTask* pHTask = streamMetaAcquireTask(pTask->pMeta, pHTaskId->streamId, pHTaskId->taskId);
if (pHTask != NULL) {
// 2. transfer the ownership of executor state
streamTaskReleaseState(pHTask);
streamTaskReloadState(pTask);
stDebug("s-task:%s transfer state from fill-history task:%s, status:%s completed", id, pHTask->id.idStr,
streamTaskGetStatus(pHTask)->name);
streamMetaReleaseTask(pTask->pMeta, pHTask);
} else {
stError("s-task:%s related fill-history task:0x%x failed to acquire, transfer state failed", id,
(int32_t)pHTaskId->taskId);
}
}
}
int64_t st = taosGetTimestampMs();
stDebug("s-task:%s start to process batch of blocks, num:%d, type:%s", id, numOfBlocks, streamQueueItemGetTypeStr(type));
int64_t ver = pTask->chkInfo.processedVer;
doSetStreamInputBlock(pTask, pInput, &ver, id);
int64_t totalSize = 0;
int32_t totalBlocks = 0;
streamTaskExecImpl(pTask, pInput, &totalSize, &totalBlocks);
double el = (taosGetTimestampMs() - st) / 1000.0;
stDebug("s-task:%s batch of input blocks exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%d", id, el,
SIZE_IN_MiB(totalSize), totalBlocks);
pTask->execInfo.outputDataBlocks += totalBlocks;
pTask->execInfo.outputDataSize += totalSize;
if (fabs(el - 0.0) <= DBL_EPSILON) {
pTask->execInfo.procsThroughput = 0;
pTask->execInfo.outputThroughput = 0;
} else {
pTask->execInfo.outputThroughput = (totalSize / el);
pTask->execInfo.procsThroughput = (blockSize / el);
}
SCheckpointInfo* pInfo = &pTask->chkInfo;
// update the currentVer if processing the submit blocks.
ASSERT(pInfo->checkpointVer <= pInfo->nextProcessVer && ver >= pInfo->checkpointVer);
if (ver != pInfo->processedVer) {
stDebug("s-task:%s update processedVer(unsaved) from %" PRId64 " to %" PRId64 " nextProcessVer:%" PRId64
" ckpt:%" PRId64,
id, pInfo->processedVer, ver, pInfo->nextProcessVer, pInfo->checkpointVer);
pInfo->processedVer = ver;
if (type != STREAM_INPUT__CHECKPOINT) {
doStreamTaskExecImpl(pTask, pInput);
}
streamFreeQitem(pInput);
@ -692,7 +706,6 @@ static int32_t doStreamExecTask(SStreamTask* pTask) {
// todo other thread may change the status
// do nothing after sync executor state to storage backend, untill the vnode-level checkpoint is completed.
if (type == STREAM_INPUT__CHECKPOINT) {
// todo add lock
SStreamTaskState* pState = streamTaskGetStatus(pTask);
if (pState->state == TASK_STATUS__CK) {
@ -717,8 +730,6 @@ static int32_t doStreamExecTask(SStreamTask* pTask) {
return 0;
}
}
return 0;
}
// the task may be set dropping/stopping, while it is still in the task queue, therefore, the sched-status can not

View File

@ -298,6 +298,7 @@ int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, void* pMeta
SArray* pSnapInfoSet = taosArrayInit(4, sizeof(SStreamTaskSnap));
int32_t code = streamCreateTaskDbSnapInfo(pMeta, path, pSnapInfoSet);
if (code != 0) {
taosArrayDestroy(pSnapInfoSet);
return -1;
}

View File

@ -624,18 +624,19 @@ void streamTaskOpenAllUpstreamInput(SStreamTask* pTask) {
void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId) {
SStreamUpstreamEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, taskId);
if (pInfo != NULL) {
if ((pInfo != NULL) && pInfo->dataAllowed) {
pInfo->dataAllowed = false;
int32_t t = atomic_add_fetch_32(&pTask->upstreamInfo.numOfClosed, 1);
ASSERT(t <= streamTaskGetNumOfUpstream(pTask));
}
}
void streamTaskOpenUpstreamInput(SStreamTask* pTask, int32_t taskId) {
SStreamUpstreamEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, taskId);
if (pInfo != NULL) {
pInfo->dataAllowed = true;
if ((pInfo != NULL) && (!pInfo->dataAllowed)) {
int32_t t = atomic_sub_fetch_32(&pTask->upstreamInfo.numOfClosed, 1);
ASSERT(t >= 0);
pInfo->dataAllowed = true;
}
}

View File

@ -2,6 +2,6 @@ char version[64] = "${TD_VER_NUMBER}";
char compatible_version[12] = "${TD_VER_COMPATIBLE}";
char gitinfo[48] = "${TD_VER_GIT}";
char gitinfoOfInternal[48] = "${TD_VER_GIT_INTERNAL}";
char buildinfo[64] = "Built ${TD_VER_OSTYPE}-${TD_VER_CPUTYPE} at ${TD_VER_DATE}";
char buildinfo[64] = "${TD_VER_OSTYPE}-${TD_VER_CPUTYPE} ${TD_VER_DATE}";
void libtaos_${TD_LIB_VER_NUMBER}_${TD_VER_OSTYPE}_${TD_VER_CPUTYPE}_${TD_VER_VERTYPE}() {};

View File

@ -269,6 +269,10 @@ class TDTestCase:
def ins_grants_check(self):
grant_name_dict = {
'service':'Service Time',
'timeseries':'Timeseries',
'dnodes':'Dnodes',
'cpu_cores':'CPU Cores',
'stream':'Stream',
'subscription':'Subscription',
'view':'View',
@ -293,6 +297,7 @@ class TDTestCase:
'mysql':'MySQL',
'postgres':'PostgreSQL',
'oracle':'Oracle',
'mssql':'SqlServer'
}
tdSql.execute('drop database if exists db2')
@ -304,7 +309,7 @@ class TDTestCase:
if result[i][0] in grant_name_dict:
tdSql.checkEqual(result[i][1], grant_name_dict[result[i][0]])
index += 1
tdSql.checkEqual(index, 24)
tdSql.checkEqual(index, len(grant_name_dict))
tdSql.query(f'select * from information_schema.ins_grants_logs')
result = tdSql.queryResult
tdSql.checkEqual(True, len(result) >= 0)

View File

@ -186,17 +186,17 @@ class TDTestCase:
tdSql.query('show dnode 1 variables')
for i in tdSql.queryResult:
if i[1].lower() == "gitinfo":
taosd_gitinfo_sql = f"gitinfo: {i[2]}"
taosd_gitinfo_sql = f"git: {i[2]}"
taos_gitinfo_sql = ''
tdSql.query('show local variables')
for i in tdSql.queryResult:
if i[0].lower() == "gitinfo":
taos_gitinfo_sql = f"gitinfo: {i[1]}"
taos_gitinfo_sql = f"git: {i[1]}"
taos_info = os.popen('taos -V').read()
taos_gitinfo = re.findall("^gitinfo.*",taos_info,re.M)
taos_gitinfo = re.findall("^git: .*",taos_info,re.M)
tdSql.checkEqual(taos_gitinfo_sql,taos_gitinfo[0])
taosd_info = os.popen('taosd -V').read()
taosd_gitinfo = re.findall("^gitinfo.*",taosd_info,re.M)
taosd_gitinfo = re.findall("^git: .*",taosd_info,re.M)
tdSql.checkEqual(taosd_gitinfo_sql,taosd_gitinfo[0])
def show_base(self):

View File

@ -5,6 +5,7 @@ import time
import socket
import os
import platform
import re
if platform.system().lower() == 'windows':
import wexpect as taosExpect
else:
@ -371,10 +372,11 @@ class TDTestCase:
if retCode != "TAOS_OK":
tdLog.exit("taos -V fail")
version = 'version: ' + version
retVal = retVal.replace("\n", "")
retVal = retVal.replace("\r", "")
if retVal.startswith(version) == False:
version = 'taos version: ' + version
# retVal = retVal.replace("\n", "")
# retVal = retVal.replace("\r", "")
taosVersion = re.findall((f'^%s'%(version)), retVal,re.M)
if len(taosVersion) == 0:
print ("return version: [%s]"%retVal)
print ("dict version: [%s]"%version)
tdLog.exit("taos -V version not match")

View File

@ -257,10 +257,10 @@ class TDTestCase:
os.system(f'taos -f {sql_file}')
tdSql.query('select count(c_1) from d2.t2 where c_1 < 10', queryTimes=1)
tdSql.checkData(0, 0, 0)
tdSql.query('select count(c_1), min(c_1),tbname from d2.can partition by tbname', queryTimes=1)
tdSql.query('select count(c_1), min(c_1),tbname from d2.can partition by tbname order by 3', queryTimes=1)
tdSql.checkData(0, 0, 0)
tdSql.checkData(0, 1, None)
tdSql.checkData(0, 2, 't3')
tdSql.checkData(0, 2, 't1')
tdSql.checkData(1, 0, 15)
tdSql.checkData(1, 1, 1471617148940980000)
@ -268,7 +268,7 @@ class TDTestCase:
tdSql.checkData(2, 0, 0)
tdSql.checkData(2, 1, None)
tdSql.checkData(2, 2, 't1')
tdSql.checkData(2, 2, 't3')
def run(self):
self.test_count_with_sma_data()

View File

@ -45,6 +45,9 @@ class TDTestCase:
for ts_value in [self.tdCom.date_time, window_close_ts-1]:
self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=ts_value)
self.tdCom.sinsert_rows(tbname=self.tb_name, ts_value=ts_value)
if i == 0 and fill_history_value is not None:
for tbname in [self.ctb_stream_des_table, self.tb_stream_des_table]:
tdSql.query(f'select count(*) from {tbname}', count_expected_res=1)
if self.tdCom.update and i%2 == 0:
self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=ts_value)
self.tdCom.sinsert_rows(tbname=self.tb_name, ts_value=ts_value)

View File

@ -435,12 +435,12 @@ int32_t shellParseArgs(int32_t argc, char *argv[]) {
shell.info.promptSize = strlen(shell.info.promptHeader);
#ifdef TD_ENTERPRISE
snprintf(shell.info.programVersion, sizeof(shell.info.programVersion),
"version: %s compatible_version: %s\ngitinfo: %s\ngitinfoOfInternal: %s\nbuildInfo: %s", version,
"%s\ntaos version: %s compatible_version: %s\ngit: %s\ngit: %s\nbuild: %s", TD_PRODUCT_NAME, version,
compatible_version, gitinfo, gitinfoOfInternal, buildinfo);
#else
snprintf(shell.info.programVersion, sizeof(shell.info.programVersion),
"version: %s compatible_version: %s\ngitinfo: %s\nbuildInfo: %s", version, compatible_version, gitinfo,
buildinfo);
"%s\ntaos version: %s compatible_version: %s\ngit: %s\nbuild: %s", TD_PRODUCT_NAME, version,
compatible_version, gitinfo, buildinfo);
#endif
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)

View File

@ -1189,12 +1189,11 @@ bool shellGetGrantInfo(char* buf) {
fprintf(stderr, "\r\nFailed to get grant information from server. Abort.\r\n");
exit(0);
}
char serverVersion[32] = {0};
char serverVersion[64] = {0};
char expiretime[32] = {0};
char expired[32] = {0};
memcpy(serverVersion, row[0], fields[0].bytes);
tstrncpy(serverVersion, row[0], 64);
memcpy(expiretime, row[1], fields[1].bytes);
memcpy(expired, row[2], fields[2].bytes);
@ -1202,10 +1201,10 @@ bool shellGetGrantInfo(char* buf) {
community = true;
} else if (strcmp(expiretime, "unlimited") == 0) {
community = false;
sprintf(buf, "Server is Enterprise %s Edition, %s and will never expire.\r\n", serverVersion, sinfo);
sprintf(buf, "Server is %s, %s and will never expire.\r\n", serverVersion, sinfo);
} else {
community = false;
sprintf(buf, "Server is Enterprise %s Edition, %s and will expire at %s.\r\n", serverVersion, sinfo,
sprintf(buf, "Server is %s, %s and will expire at %s.\r\n", serverVersion, sinfo,
expiretime);
}