dmchen/trans-improve-show-all

This commit is contained in:
dmchen 2024-11-08 15:15:06 +08:00
parent d6d5be570d
commit 50affc90f8
2 changed files with 192 additions and 118 deletions

View File

@ -309,11 +309,16 @@ static const SSysDbTableSchema smaSchema[] = {
static const SSysDbTableSchema transSchema[] = {
{.name = "id", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false},
{.name = "action", .bytes = 2048 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},//TODO dmchen
{.name = "action_type", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false},
{.name = "obj_type", .bytes = 2048 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},//TODO dmchen
{.name = "result", .bytes = 2048 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},//TODO dmchen
{.name = "detail", .bytes = (TSDB_TRANS_ERROR_LEN - 1) + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},//TODO dmchen
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false},
{.name = "stage", .bytes = TSDB_TRANS_STAGE_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "oper", .bytes = TSDB_TRANS_OPER_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "db", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "stable", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "killable", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "kill_mnode", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "failed_times", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false},
{.name = "last_exec_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false},
{.name = "last_action_info", .bytes = (TSDB_TRANS_ERROR_LEN - 1) + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
};
static const SSysDbTableSchema configSchema[] = {
@ -398,11 +403,11 @@ static const SSysDbTableSchema userCompactsDetailSchema[] = {
static const SSysDbTableSchema userTransactionDetailSchema[] = {
{.name = "transaction_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false},
{.name = "action_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false},
//{.name = "dnode_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false},
//{.name = "number_fileset", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false},
//{.name = "finished", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false},
//{.name = "start_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false},
{.name = "action", .bytes = 2048 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},//TODO dmchen
{.name = "action_type", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false},
{.name = "obj_type", .bytes = 2048 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},//TODO dmchen
{.name = "result", .bytes = 2048 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},//TODO dmchen
{.name = "detail", .bytes = (TSDB_TRANS_ERROR_LEN - 1) + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},//TODO dmchen
};
static const SSysDbTableSchema anodesSchema[] = {

View File

@ -2071,7 +2071,6 @@ void mndTransPullup(SMnode *pMnode) {
taosArrayDestroy(pArray);
}
/*
static int32_t mndRetrieveTrans(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb;
@ -2226,19 +2225,18 @@ _OVER:
pShow->numOfRows += numOfRows;
return numOfRows;
}
*/
static int32_t mndShowTransCommonColumns(SShowObj *pShow, SSDataBlock *pBlock, STransAction *pAction, STrans *pTrans,
int32_t numOfRows, int32_t *cols) {
static int32_t mndShowTransCommonColumns(SShowObj *pShow, SSDataBlock *pBlock, STransAction *pAction,
int32_t transactionId, int32_t curActionId, int32_t numOfRows, int32_t *cols) {
int32_t code = 0;
int32_t lino = 0;
int32_t len = 0;
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, (*cols)++);
TAOS_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pTrans->id, false), &lino, _OVER);
TAOS_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&transactionId, false), &lino, _OVER);
char action[2048 + 1] = {0}; // TODO dmchen
if (pTrans->lastAction == pAction->id) {
if (curActionId == pAction->id) {
len += snprintf(action + len, sizeof(action) - len, "%s:%d(cur)", mndTransStr(pAction->stage), pAction->id);
} else {
len += snprintf(action + len, sizeof(action) - len, "%s:%d", mndTransStr(pAction->stage), pAction->id);
@ -2255,124 +2253,197 @@ _OVER:
return code;
}
static int32_t mndRetrieveTrans(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
static int32_t mndShowTransAction(SShowObj *pShow, SSDataBlock *pBlock, STransAction *pAction, int32_t transactionId,
int32_t curActionId, int32_t rows, int32_t numOfRows) {
int32_t code = 0;
int32_t lino = 0;
int32_t len = 0;
int32_t cols = 0;
cols = 0;
mndShowTransCommonColumns(pShow, pBlock, pAction, transactionId, curActionId, numOfRows, &cols);
if (pAction->actionType == TRANS_ACTION_MSG) {
char objType[TSDB_TRANS_ERROR_LEN + 1] = {0}; // TODO dmchen
strcpy(objType, TMSG_INFO(pAction->msgType));
char objTypeVStr[TSDB_TRANS_ERROR_LEN + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(objTypeVStr, objType, pShow->pMeta->pSchemas[cols].bytes);
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
TAOS_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)objTypeVStr, false), &lino, _OVER);
char result[TSDB_TRANS_ERROR_LEN + 1] = {0}; // TODO dmchen
int32_t len = 0;
len += snprintf(result + len, sizeof(result) - len, "snt:%d, rec:%d", pAction->msgSent, pAction->msgReceived);
len += snprintf(result + len, sizeof(result) - len, ", errCode:0x%x(%s)", pAction->errCode & 0xFFFF,
tstrerror(pAction->errCode));
char resultVStr[TSDB_TRANS_ERROR_LEN + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(resultVStr, result, pShow->pMeta->pSchemas[cols].bytes);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
TAOS_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)resultVStr, false), &lino, _OVER);
char detail[TSDB_TRANS_ERROR_LEN] = {0}; // TODO dmchen
len = 0;
SEpSet epset = pAction->epSet;
if (epset.numOfEps > 0) {
len += snprintf(detail + len, sizeof(detail) - len, "numOfEps:%d inUse:%d ", epset.numOfEps, epset.inUse);
for (int32_t i = 0; i < epset.numOfEps; ++i) {
len += snprintf(detail + len, sizeof(detail) - len, "ep:%d-%s:%u ", i, epset.eps[i].fqdn, epset.eps[i].port);
}
}
char bufStart[40] = {0};
taosFormatUtcTime(bufStart, sizeof(bufStart), pAction->startTime, TSDB_TIME_PRECISION_MILLI);
char bufEnd[40] = {0};
taosFormatUtcTime(bufEnd, sizeof(bufEnd), pAction->endTime, TSDB_TIME_PRECISION_MILLI);
len += snprintf(detail + len, sizeof(detail) - len, "startTime:%s, endTime:%s, ", bufStart, bufEnd);
char detailVStr[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(detailVStr, detail, pShow->pMeta->pSchemas[cols].bytes);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
TAOS_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)detailVStr, false), &lino, _OVER);
} else {
int32_t len = 0;
char objType[TSDB_TRANS_ERROR_LEN + 1] = {0}; // TODO dmchen
if (pAction->pRaw->type == SDB_VGROUP) {
SSdbRow *pRow = mndVgroupActionDecode(pAction->pRaw);
SVgObj *pVgroup = sdbGetRowObj(pRow);
len += snprintf(objType + len, sizeof(objType) - len, "%s(%d)", sdbTableName(pAction->pRaw->type), pVgroup->vgId);
taosMemoryFreeClear(pRow);
} else {
strcpy(objType, sdbTableName(pAction->pRaw->type));
}
char objTypeVStr[TSDB_TRANS_ERROR_LEN + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(objTypeVStr, objType, pShow->pMeta->pSchemas[cols].bytes);
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
TAOS_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)objTypeVStr, false), &lino, _OVER);
char result[TSDB_TRANS_ERROR_LEN + 1] = {0}; // TODO dmchen
len = 0;
len += snprintf(result + len, sizeof(result) - len, "rawWritten:%d", pAction->rawWritten);
char resultVStr[TSDB_TRANS_ERROR_LEN + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(resultVStr, result, pShow->pMeta->pSchemas[cols].bytes);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
TAOS_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)resultVStr, false), &lino, _OVER);
char detail[TSDB_TRANS_ERROR_LEN] = {0}; // TODO dmchen
len = 0;
len += snprintf(detail + len, sizeof(detail) - len, "sdbStatus:%s", sdbStatusName(pAction->pRaw->status));
char detailVStr[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(detailVStr, detail, pShow->pMeta->pSchemas[cols].bytes);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
TAOS_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)detailVStr, false), &lino, _OVER);
}
_OVER:
if (code != 0) mError("failed to retrieve at line:%d, since %s", lino, tstrerror(code));
return code;
}
static SArray *mndTransGetAction(STrans *pTrans, ETrnStage stage) {
if (stage == TRN_STAGE_PREPARE) {
return pTrans->prepareActions;
}
if (stage == TRN_STAGE_REDO_ACTION) {
return pTrans->redoActions;
}
if (stage == TRN_STAGE_COMMIT_ACTION) {
return pTrans->commitActions;
}
if (stage == TRN_STAGE_UNDO_ACTION) {
return pTrans->undoActions;
}
return NULL;
}
typedef struct STransDetailIter {
void *pIter;
STrans *pTrans;
ETrnStage stage;
int32_t num;
} STransDetailIter;
static int32_t mndRetrieveTransDetail(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb;
int32_t numOfRows = 0;
STrans *pTrans = NULL;
int32_t cols = 0;
int32_t code = 0;
int32_t lino = 0;
mInfo("start to mndRetrieveTransDetail, rows:%d, pShow->numOfRows:%d, pShow->pIter:%p", rows, pShow->numOfRows,
pShow->pIter);
if (pShow->pIter == NULL) {
pShow->pIter = taosMemoryMalloc(sizeof(STransDetailIter));
if (pShow->pIter == NULL) {
mError("failed to malloc for pShow->pIter");
return 0;
}
memset(pShow->pIter, 0, sizeof(STransDetailIter));
}
STransDetailIter *pShowIter = (STransDetailIter *)pShow->pIter;
while (numOfRows < rows) {
pShow->pIter = sdbFetch(pSdb, SDB_TRANS, pShow->pIter, (void **)&pTrans);
mInfo("pShow->pIter:%p, pTrans:%p", pShow->pIter, pTrans);
if (pShow->pIter == NULL) break;
if (pShowIter->pTrans == NULL) {
pShowIter->pIter = sdbFetch(pSdb, SDB_TRANS, pShowIter->pIter, (void **)&(pShowIter->pTrans));
mInfo("pShow->pIter:%p, pTrans:%p", pShowIter->pIter, pShowIter->pTrans);
if (pShowIter->pIter == NULL) break;
if (pTrans->stage == TRN_STAGE_REDO_ACTION) {
mInfo("redoActions num:%" PRId64, taosArrayGetSize(pTrans->redoActions));
int32_t actionNum = 0;
STrans *pTrans = pShowIter->pTrans;
for (int32_t i = 0; i < taosArrayGetSize(pTrans->redoActions); ++i) {
STransAction *pAction = taosArrayGet(pTrans->redoActions, i);
cols = 0;
mndShowTransCommonColumns(pShow, pBlock, pAction, pTrans, numOfRows, &cols);
if (pAction->actionType == TRANS_ACTION_MSG) {
char objType[TSDB_TRANS_ERROR_LEN + 1] = {0}; // TODO dmchen
strcpy(objType, TMSG_INFO(pAction->msgType));
char objTypeVStr[TSDB_TRANS_ERROR_LEN + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(objTypeVStr, objType, pShow->pMeta->pSchemas[cols].bytes);
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)objTypeVStr, false), pTrans, &lino,
_OVER);
char result[TSDB_TRANS_ERROR_LEN + 1] = {0}; // TODO dmchen
int32_t len = 0;
len += snprintf(result + len, sizeof(result) - len, "snt:%d, rec:%d", pAction->msgSent, pAction->msgReceived);
len += snprintf(result + len, sizeof(result) - len, ", errCode:0x%x(%s)", pAction->errCode & 0xFFFF,
tstrerror(pAction->errCode));
char resultVStr[TSDB_TRANS_ERROR_LEN + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(resultVStr, result, pShow->pMeta->pSchemas[cols].bytes);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)resultVStr, false), pTrans, &lino,
_OVER);
char detail[TSDB_TRANS_ERROR_LEN] = {0}; // TODO dmchen
len = 0;
SEpSet epset = pAction->epSet;
if (epset.numOfEps > 0) {
len += snprintf(detail + len, sizeof(detail) - len, "numOfEps:%d inUse:%d ", epset.numOfEps, epset.inUse);
for (int32_t i = 0; i < epset.numOfEps; ++i) {
len +=
snprintf(detail + len, sizeof(detail) - len, "ep:%d-%s:%u ", i, epset.eps[i].fqdn, epset.eps[i].port);
}
}
char bufStart[40] = {0};
taosFormatUtcTime(bufStart, sizeof(bufStart), pAction->startTime, TSDB_TIME_PRECISION_MILLI);
char bufEnd[40] = {0};
taosFormatUtcTime(bufEnd, sizeof(bufEnd), pAction->endTime, TSDB_TIME_PRECISION_MILLI);
len += snprintf(detail + len, sizeof(detail) - len, "startTime:%s, endTime:%s, ", bufStart, bufEnd);
char detailVStr[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(detailVStr, detail, pShow->pMeta->pSchemas[cols].bytes);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)detailVStr, false), pTrans, &lino,
_OVER);
} else {
int32_t len = 0;
char objType[TSDB_TRANS_ERROR_LEN + 1] = {0}; // TODO dmchen
if (pAction->pRaw->type == SDB_VGROUP) {
SSdbRow *pRow = mndVgroupActionDecode(pAction->pRaw);
SVgObj *pVgroup = sdbGetRowObj(pRow);
len += snprintf(objType + len, sizeof(objType) - len, "%s(%d)", sdbTableName(pAction->pRaw->type),
pVgroup->vgId);
taosMemoryFreeClear(pRow);
} else {
strcpy(objType, sdbTableName(pAction->pRaw->type));
}
char objTypeVStr[TSDB_TRANS_ERROR_LEN + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(objTypeVStr, objType, pShow->pMeta->pSchemas[cols].bytes);
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)objTypeVStr, false), pTrans, &lino,
_OVER);
char result[TSDB_TRANS_ERROR_LEN + 1] = {0}; // TODO dmchen
len = 0;
len += snprintf(result + len, sizeof(result) - len, "rawWritten:%d", pAction->rawWritten);
char resultVStr[TSDB_TRANS_ERROR_LEN + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(resultVStr, result, pShow->pMeta->pSchemas[cols].bytes);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)resultVStr, false), pTrans, &lino,
_OVER);
char detail[TSDB_TRANS_ERROR_LEN] = {0}; // TODO dmchen
len = 0;
len += snprintf(detail + len, sizeof(detail) - len, "sdbStatus:%s", sdbStatusName(pAction->pRaw->status));
char detailVStr[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(detailVStr, detail, pShow->pMeta->pSchemas[cols].bytes);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)detailVStr, false), pTrans, &lino,
_OVER);
if (pTrans->stage == TRN_STAGE_REDO_ACTION) {
SArray *pActions = mndTransGetAction(pTrans, pTrans->stage);
actionNum = taosArrayGetSize(pActions);
mInfo("stage:%s, Actions num:%d", mndTransStr(pTrans->stage), actionNum);
for (int32_t i = 0; i < actionNum; ++i) {
STransAction *pAction = taosArrayGet(pTrans->redoActions, i);
mndShowTransAction(pShow, pBlock, pAction, pTrans->id, pTrans->lastAction, rows, numOfRows);
numOfRows++;
if (numOfRows >= rows) break;
}
}
if (numOfRows == actionNum) {
sdbRelease(pSdb, pTrans);
pShowIter->pTrans = NULL;
} else {
pShowIter->pTrans = pTrans;
pShowIter->stage = pTrans->stage;
pShowIter->num = numOfRows;
}
} else {
int32_t actionNum = 0;
STrans *pTrans = pShowIter->pTrans;
SArray *pActions = mndTransGetAction(pTrans, pShowIter->stage);
actionNum = taosArrayGetSize(pActions);
mInfo("stage:%s, Actions num:%d", mndTransStr(pShowIter->stage), actionNum);
for (int32_t i = pShowIter->num; i < actionNum; ++i) {
STransAction *pAction = taosArrayGet(pShowIter->pTrans->redoActions, i);
mndShowTransAction(pShow, pBlock, pAction, pTrans->id, pTrans->lastAction, rows, numOfRows);
numOfRows++;
if (numOfRows >= rows) break;
}
sdbRelease(pSdb, pTrans);
if (pShow->numOfRows + numOfRows == actionNum) {
sdbRelease(pSdb, pTrans);
pShowIter->pTrans = NULL;
} else {
pShowIter->pTrans = pTrans;
pShowIter->stage = pTrans->stage;
pShowIter->num = numOfRows;
}
break;
}
}
@ -2387,8 +2458,6 @@ _OVER:
return numOfRows;
}
static int32_t mndRetrieveTransDetail(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) { return 0; }
static void mndCancelGetNextTrans(SMnode *pMnode, void *pIter) {
SSdb *pSdb = pMnode->pSdb;
sdbCancelFetchByType(pSdb, pIter, SDB_TRANS);