diff --git a/source/common/src/systable.c b/source/common/src/systable.c index 13eda32bfe..846dc987a5 100644 --- a/source/common/src/systable.c +++ b/source/common/src/systable.c @@ -309,11 +309,16 @@ static const SSysDbTableSchema smaSchema[] = { static const SSysDbTableSchema transSchema[] = { {.name = "id", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false}, - {.name = "action", .bytes = 2048 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},//TODO dmchen - {.name = "action_type", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false}, - {.name = "obj_type", .bytes = 2048 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},//TODO dmchen - {.name = "result", .bytes = 2048 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},//TODO dmchen - {.name = "detail", .bytes = (TSDB_TRANS_ERROR_LEN - 1) + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},//TODO dmchen + {.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false}, + {.name = "stage", .bytes = TSDB_TRANS_STAGE_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, + {.name = "oper", .bytes = TSDB_TRANS_OPER_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, + {.name = "db", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, + {.name = "stable", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, + {.name = "killable", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, + {.name = "kill_mnode", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, + {.name = "failed_times", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false}, + {.name = "last_exec_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false}, + {.name = "last_action_info", .bytes = (TSDB_TRANS_ERROR_LEN - 1) + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, }; static const SSysDbTableSchema configSchema[] = { @@ -398,11 +403,11 @@ static const SSysDbTableSchema userCompactsDetailSchema[] = { static const SSysDbTableSchema userTransactionDetailSchema[] = { {.name = "transaction_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false}, - {.name = "action_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false}, - //{.name = "dnode_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false}, - //{.name = "number_fileset", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false}, - //{.name = "finished", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false}, - //{.name = "start_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false}, + {.name = "action", .bytes = 2048 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},//TODO dmchen + {.name = "action_type", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false}, + {.name = "obj_type", .bytes = 2048 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},//TODO dmchen + {.name = "result", .bytes = 2048 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},//TODO dmchen + {.name = "detail", .bytes = (TSDB_TRANS_ERROR_LEN - 1) + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},//TODO dmchen }; static const SSysDbTableSchema anodesSchema[] = { diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index b002a91134..af9ffd4d98 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -2071,7 +2071,6 @@ void mndTransPullup(SMnode *pMnode) { taosArrayDestroy(pArray); } -/* static int32_t mndRetrieveTrans(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) { SMnode *pMnode = pReq->info.node; SSdb *pSdb = pMnode->pSdb; @@ -2226,19 +2225,18 @@ _OVER: pShow->numOfRows += numOfRows; return numOfRows; } -*/ -static int32_t mndShowTransCommonColumns(SShowObj *pShow, SSDataBlock *pBlock, STransAction *pAction, STrans *pTrans, - int32_t numOfRows, int32_t *cols) { +static int32_t mndShowTransCommonColumns(SShowObj *pShow, SSDataBlock *pBlock, STransAction *pAction, + int32_t transactionId, int32_t curActionId, int32_t numOfRows, int32_t *cols) { int32_t code = 0; int32_t lino = 0; int32_t len = 0; SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, (*cols)++); - TAOS_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pTrans->id, false), &lino, _OVER); + TAOS_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&transactionId, false), &lino, _OVER); char action[2048 + 1] = {0}; // TODO dmchen - if (pTrans->lastAction == pAction->id) { + if (curActionId == pAction->id) { len += snprintf(action + len, sizeof(action) - len, "%s:%d(cur)", mndTransStr(pAction->stage), pAction->id); } else { len += snprintf(action + len, sizeof(action) - len, "%s:%d", mndTransStr(pAction->stage), pAction->id); @@ -2255,124 +2253,197 @@ _OVER: return code; } -static int32_t mndRetrieveTrans(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) { +static int32_t mndShowTransAction(SShowObj *pShow, SSDataBlock *pBlock, STransAction *pAction, int32_t transactionId, + int32_t curActionId, int32_t rows, int32_t numOfRows) { + int32_t code = 0; + int32_t lino = 0; + int32_t len = 0; + int32_t cols = 0; + + cols = 0; + + mndShowTransCommonColumns(pShow, pBlock, pAction, transactionId, curActionId, numOfRows, &cols); + + if (pAction->actionType == TRANS_ACTION_MSG) { + char objType[TSDB_TRANS_ERROR_LEN + 1] = {0}; // TODO dmchen + strcpy(objType, TMSG_INFO(pAction->msgType)); + char objTypeVStr[TSDB_TRANS_ERROR_LEN + VARSTR_HEADER_SIZE] = {0}; + STR_WITH_MAXSIZE_TO_VARSTR(objTypeVStr, objType, pShow->pMeta->pSchemas[cols].bytes); + SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + TAOS_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)objTypeVStr, false), &lino, _OVER); + + char result[TSDB_TRANS_ERROR_LEN + 1] = {0}; // TODO dmchen + int32_t len = 0; + len += snprintf(result + len, sizeof(result) - len, "snt:%d, rec:%d", pAction->msgSent, pAction->msgReceived); + len += snprintf(result + len, sizeof(result) - len, ", errCode:0x%x(%s)", pAction->errCode & 0xFFFF, + tstrerror(pAction->errCode)); + char resultVStr[TSDB_TRANS_ERROR_LEN + VARSTR_HEADER_SIZE] = {0}; + STR_WITH_MAXSIZE_TO_VARSTR(resultVStr, result, pShow->pMeta->pSchemas[cols].bytes); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + TAOS_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)resultVStr, false), &lino, _OVER); + + char detail[TSDB_TRANS_ERROR_LEN] = {0}; // TODO dmchen + len = 0; + + SEpSet epset = pAction->epSet; + if (epset.numOfEps > 0) { + len += snprintf(detail + len, sizeof(detail) - len, "numOfEps:%d inUse:%d ", epset.numOfEps, epset.inUse); + for (int32_t i = 0; i < epset.numOfEps; ++i) { + len += snprintf(detail + len, sizeof(detail) - len, "ep:%d-%s:%u ", i, epset.eps[i].fqdn, epset.eps[i].port); + } + } + + char bufStart[40] = {0}; + taosFormatUtcTime(bufStart, sizeof(bufStart), pAction->startTime, TSDB_TIME_PRECISION_MILLI); + + char bufEnd[40] = {0}; + taosFormatUtcTime(bufEnd, sizeof(bufEnd), pAction->endTime, TSDB_TIME_PRECISION_MILLI); + + len += snprintf(detail + len, sizeof(detail) - len, "startTime:%s, endTime:%s, ", bufStart, bufEnd); + + char detailVStr[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0}; + STR_WITH_MAXSIZE_TO_VARSTR(detailVStr, detail, pShow->pMeta->pSchemas[cols].bytes); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + TAOS_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)detailVStr, false), &lino, _OVER); + + } else { + int32_t len = 0; + + char objType[TSDB_TRANS_ERROR_LEN + 1] = {0}; // TODO dmchen + + if (pAction->pRaw->type == SDB_VGROUP) { + SSdbRow *pRow = mndVgroupActionDecode(pAction->pRaw); + SVgObj *pVgroup = sdbGetRowObj(pRow); + len += snprintf(objType + len, sizeof(objType) - len, "%s(%d)", sdbTableName(pAction->pRaw->type), pVgroup->vgId); + taosMemoryFreeClear(pRow); + } else { + strcpy(objType, sdbTableName(pAction->pRaw->type)); + } + + char objTypeVStr[TSDB_TRANS_ERROR_LEN + VARSTR_HEADER_SIZE] = {0}; + STR_WITH_MAXSIZE_TO_VARSTR(objTypeVStr, objType, pShow->pMeta->pSchemas[cols].bytes); + SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + TAOS_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)objTypeVStr, false), &lino, _OVER); + + char result[TSDB_TRANS_ERROR_LEN + 1] = {0}; // TODO dmchen + len = 0; + len += snprintf(result + len, sizeof(result) - len, "rawWritten:%d", pAction->rawWritten); + char resultVStr[TSDB_TRANS_ERROR_LEN + VARSTR_HEADER_SIZE] = {0}; + STR_WITH_MAXSIZE_TO_VARSTR(resultVStr, result, pShow->pMeta->pSchemas[cols].bytes); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + TAOS_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)resultVStr, false), &lino, _OVER); + + char detail[TSDB_TRANS_ERROR_LEN] = {0}; // TODO dmchen + len = 0; + len += snprintf(detail + len, sizeof(detail) - len, "sdbStatus:%s", sdbStatusName(pAction->pRaw->status)); + char detailVStr[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0}; + STR_WITH_MAXSIZE_TO_VARSTR(detailVStr, detail, pShow->pMeta->pSchemas[cols].bytes); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + TAOS_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)detailVStr, false), &lino, _OVER); + } + +_OVER: + if (code != 0) mError("failed to retrieve at line:%d, since %s", lino, tstrerror(code)); + return code; +} + +static SArray *mndTransGetAction(STrans *pTrans, ETrnStage stage) { + if (stage == TRN_STAGE_PREPARE) { + return pTrans->prepareActions; + } + if (stage == TRN_STAGE_REDO_ACTION) { + return pTrans->redoActions; + } + if (stage == TRN_STAGE_COMMIT_ACTION) { + return pTrans->commitActions; + } + if (stage == TRN_STAGE_UNDO_ACTION) { + return pTrans->undoActions; + } + return NULL; +} + +typedef struct STransDetailIter { + void *pIter; + STrans *pTrans; + ETrnStage stage; + int32_t num; +} STransDetailIter; + +static int32_t mndRetrieveTransDetail(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) { SMnode *pMnode = pReq->info.node; SSdb *pSdb = pMnode->pSdb; int32_t numOfRows = 0; - STrans *pTrans = NULL; - int32_t cols = 0; + int32_t code = 0; int32_t lino = 0; mInfo("start to mndRetrieveTransDetail, rows:%d, pShow->numOfRows:%d, pShow->pIter:%p", rows, pShow->numOfRows, pShow->pIter); + if (pShow->pIter == NULL) { + pShow->pIter = taosMemoryMalloc(sizeof(STransDetailIter)); + if (pShow->pIter == NULL) { + mError("failed to malloc for pShow->pIter"); + return 0; + } + memset(pShow->pIter, 0, sizeof(STransDetailIter)); + } + + STransDetailIter *pShowIter = (STransDetailIter *)pShow->pIter; + while (numOfRows < rows) { - pShow->pIter = sdbFetch(pSdb, SDB_TRANS, pShow->pIter, (void **)&pTrans); - mInfo("pShow->pIter:%p, pTrans:%p", pShow->pIter, pTrans); - if (pShow->pIter == NULL) break; + if (pShowIter->pTrans == NULL) { + pShowIter->pIter = sdbFetch(pSdb, SDB_TRANS, pShowIter->pIter, (void **)&(pShowIter->pTrans)); + mInfo("pShow->pIter:%p, pTrans:%p", pShowIter->pIter, pShowIter->pTrans); + if (pShowIter->pIter == NULL) break; - if (pTrans->stage == TRN_STAGE_REDO_ACTION) { - mInfo("redoActions num:%" PRId64, taosArrayGetSize(pTrans->redoActions)); + int32_t actionNum = 0; + STrans *pTrans = pShowIter->pTrans; - for (int32_t i = 0; i < taosArrayGetSize(pTrans->redoActions); ++i) { - STransAction *pAction = taosArrayGet(pTrans->redoActions, i); - - cols = 0; - - mndShowTransCommonColumns(pShow, pBlock, pAction, pTrans, numOfRows, &cols); - - if (pAction->actionType == TRANS_ACTION_MSG) { - char objType[TSDB_TRANS_ERROR_LEN + 1] = {0}; // TODO dmchen - strcpy(objType, TMSG_INFO(pAction->msgType)); - char objTypeVStr[TSDB_TRANS_ERROR_LEN + VARSTR_HEADER_SIZE] = {0}; - STR_WITH_MAXSIZE_TO_VARSTR(objTypeVStr, objType, pShow->pMeta->pSchemas[cols].bytes); - SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)objTypeVStr, false), pTrans, &lino, - _OVER); - - char result[TSDB_TRANS_ERROR_LEN + 1] = {0}; // TODO dmchen - int32_t len = 0; - len += snprintf(result + len, sizeof(result) - len, "snt:%d, rec:%d", pAction->msgSent, pAction->msgReceived); - len += snprintf(result + len, sizeof(result) - len, ", errCode:0x%x(%s)", pAction->errCode & 0xFFFF, - tstrerror(pAction->errCode)); - char resultVStr[TSDB_TRANS_ERROR_LEN + VARSTR_HEADER_SIZE] = {0}; - STR_WITH_MAXSIZE_TO_VARSTR(resultVStr, result, pShow->pMeta->pSchemas[cols].bytes); - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)resultVStr, false), pTrans, &lino, - _OVER); - - char detail[TSDB_TRANS_ERROR_LEN] = {0}; // TODO dmchen - len = 0; - - SEpSet epset = pAction->epSet; - if (epset.numOfEps > 0) { - len += snprintf(detail + len, sizeof(detail) - len, "numOfEps:%d inUse:%d ", epset.numOfEps, epset.inUse); - for (int32_t i = 0; i < epset.numOfEps; ++i) { - len += - snprintf(detail + len, sizeof(detail) - len, "ep:%d-%s:%u ", i, epset.eps[i].fqdn, epset.eps[i].port); - } - } - - char bufStart[40] = {0}; - taosFormatUtcTime(bufStart, sizeof(bufStart), pAction->startTime, TSDB_TIME_PRECISION_MILLI); - - char bufEnd[40] = {0}; - taosFormatUtcTime(bufEnd, sizeof(bufEnd), pAction->endTime, TSDB_TIME_PRECISION_MILLI); - - len += snprintf(detail + len, sizeof(detail) - len, "startTime:%s, endTime:%s, ", bufStart, bufEnd); - - char detailVStr[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0}; - STR_WITH_MAXSIZE_TO_VARSTR(detailVStr, detail, pShow->pMeta->pSchemas[cols].bytes); - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)detailVStr, false), pTrans, &lino, - _OVER); - - } else { - int32_t len = 0; - - char objType[TSDB_TRANS_ERROR_LEN + 1] = {0}; // TODO dmchen - - if (pAction->pRaw->type == SDB_VGROUP) { - SSdbRow *pRow = mndVgroupActionDecode(pAction->pRaw); - SVgObj *pVgroup = sdbGetRowObj(pRow); - len += snprintf(objType + len, sizeof(objType) - len, "%s(%d)", sdbTableName(pAction->pRaw->type), - pVgroup->vgId); - taosMemoryFreeClear(pRow); - } else { - strcpy(objType, sdbTableName(pAction->pRaw->type)); - } - - char objTypeVStr[TSDB_TRANS_ERROR_LEN + VARSTR_HEADER_SIZE] = {0}; - STR_WITH_MAXSIZE_TO_VARSTR(objTypeVStr, objType, pShow->pMeta->pSchemas[cols].bytes); - SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)objTypeVStr, false), pTrans, &lino, - _OVER); - - char result[TSDB_TRANS_ERROR_LEN + 1] = {0}; // TODO dmchen - len = 0; - len += snprintf(result + len, sizeof(result) - len, "rawWritten:%d", pAction->rawWritten); - char resultVStr[TSDB_TRANS_ERROR_LEN + VARSTR_HEADER_SIZE] = {0}; - STR_WITH_MAXSIZE_TO_VARSTR(resultVStr, result, pShow->pMeta->pSchemas[cols].bytes); - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)resultVStr, false), pTrans, &lino, - _OVER); - - char detail[TSDB_TRANS_ERROR_LEN] = {0}; // TODO dmchen - len = 0; - len += snprintf(detail + len, sizeof(detail) - len, "sdbStatus:%s", sdbStatusName(pAction->pRaw->status)); - char detailVStr[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0}; - STR_WITH_MAXSIZE_TO_VARSTR(detailVStr, detail, pShow->pMeta->pSchemas[cols].bytes); - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)detailVStr, false), pTrans, &lino, - _OVER); + if (pTrans->stage == TRN_STAGE_REDO_ACTION) { + SArray *pActions = mndTransGetAction(pTrans, pTrans->stage); + actionNum = taosArrayGetSize(pActions); + mInfo("stage:%s, Actions num:%d", mndTransStr(pTrans->stage), actionNum); + for (int32_t i = 0; i < actionNum; ++i) { + STransAction *pAction = taosArrayGet(pTrans->redoActions, i); + mndShowTransAction(pShow, pBlock, pAction, pTrans->id, pTrans->lastAction, rows, numOfRows); + numOfRows++; + if (numOfRows >= rows) break; } + } + if (numOfRows == actionNum) { + sdbRelease(pSdb, pTrans); + pShowIter->pTrans = NULL; + } else { + pShowIter->pTrans = pTrans; + pShowIter->stage = pTrans->stage; + pShowIter->num = numOfRows; + } + } else { + int32_t actionNum = 0; + STrans *pTrans = pShowIter->pTrans; + SArray *pActions = mndTransGetAction(pTrans, pShowIter->stage); + actionNum = taosArrayGetSize(pActions); + mInfo("stage:%s, Actions num:%d", mndTransStr(pShowIter->stage), actionNum); + + for (int32_t i = pShowIter->num; i < actionNum; ++i) { + STransAction *pAction = taosArrayGet(pShowIter->pTrans->redoActions, i); + mndShowTransAction(pShow, pBlock, pAction, pTrans->id, pTrans->lastAction, rows, numOfRows); numOfRows++; if (numOfRows >= rows) break; } - sdbRelease(pSdb, pTrans); + if (pShow->numOfRows + numOfRows == actionNum) { + sdbRelease(pSdb, pTrans); + pShowIter->pTrans = NULL; + } else { + pShowIter->pTrans = pTrans; + pShowIter->stage = pTrans->stage; + pShowIter->num = numOfRows; + } + break; } } @@ -2387,8 +2458,6 @@ _OVER: return numOfRows; } -static int32_t mndRetrieveTransDetail(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) { return 0; } - static void mndCancelGetNextTrans(SMnode *pMnode, void *pIter) { SSdb *pSdb = pMnode->pSdb; sdbCancelFetchByType(pSdb, pIter, SDB_TRANS);