refactor: do some internal refactor.

This commit is contained in:
Haojun Liao 2024-04-30 13:25:39 +08:00
parent d3e676b084
commit 4c27af9b13
3 changed files with 12 additions and 4 deletions

View File

@ -173,6 +173,7 @@ static const SSysDbTableSchema streamSchema[] = {
{.name = "watermark", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false},
{.name = "trigger", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "sink_quota", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "checkpoint_interval", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "checkpoint_backup", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "history_scan_idle", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
};

View File

@ -1503,6 +1503,7 @@ static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)&trigger, false);
// sink_quota
char sinkQuota[20 + VARSTR_HEADER_SIZE] = {0};
sinkQuota[0] = '0';
char dstStr[20] = {0};
@ -1510,6 +1511,12 @@ static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)dstStr, false);
// checkpoint interval
char tmp[20 + VARSTR_HEADER_SIZE] = {0};
STR_TO_VARSTR(tmp, "none")
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)tmp, false);
// checkpoint backup type
char backup[20 + VARSTR_HEADER_SIZE] = {0};
STR_TO_VARSTR(backup, "none")

View File

@ -333,8 +333,8 @@ int32_t doMergeExistedRows(SSubmitTbData* pExisted, const SSubmitTbData* pNew, c
}
while (j < newLen && k < oldLen) {
SRow* pNewRow = TARRAY_GET_ELEM(pNew->aRowP, j);
SRow* pOldRow = TARRAY_GET_ELEM(pExisted->aRowP, k);
SRow* pNewRow = *(SRow**)TARRAY_GET_ELEM(pNew->aRowP, j);
SRow* pOldRow = *(SRow**)TARRAY_GET_ELEM(pExisted->aRowP, k);
if (pNewRow->ts <= pOldRow->ts) {
taosArrayPush(pFinal, &pNewRow);
@ -374,12 +374,12 @@ int32_t doMergeExistedRows(SSubmitTbData* pExisted, const SSubmitTbData* pNew, c
}
while (j < newLen) {
SRow* pRow = TARRAY_GET_ELEM(pNew->aRowP, j++);
SRow* pRow = *(SRow**)TARRAY_GET_ELEM(pNew->aRowP, j++);
taosArrayPush(pFinal, &pRow);
}
while (k < oldLen) {
SRow* pRow = TARRAY_GET_ELEM(pExisted->aRowP, k++);
SRow* pRow = *(SRow**)TARRAY_GET_ELEM(pExisted->aRowP, k++);
taosArrayPush(pFinal, &pRow);
}