other: rsma submit

This commit is contained in:
Cary Xu 2022-08-11 19:59:47 +08:00
parent 133beb4d49
commit d9c10a2f0f
3 changed files with 32 additions and 30 deletions

View File

@ -246,7 +246,7 @@ void blockDebugShowDataBlocks(const SArray* dataBlocks, const char* flag);
// for debug // for debug
char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** dumpBuf); char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** dumpBuf);
int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks, STSchema* pTSchema, int32_t vgId, int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SSDataBlock* pDataBlocks, STSchema* pTSchema, int32_t vgId,
tb_uid_t suid); tb_uid_t suid);

View File

@ -1875,15 +1875,15 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf)
* @param suid * @param suid
* *
*/ */
int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks, STSchema* pTSchema, int32_t vgId, int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SSDataBlock* pDataBlock, STSchema* pTSchema, int32_t vgId,
tb_uid_t suid) { tb_uid_t suid) {
int32_t sz = taosArrayGetSize(pDataBlocks);
int32_t bufSize = sizeof(SSubmitReq); int32_t bufSize = sizeof(SSubmitReq);
int32_t sz = 1;
for (int32_t i = 0; i < sz; ++i) { for (int32_t i = 0; i < sz; ++i) {
SDataBlockInfo* pBlkInfo = &((SSDataBlock*)taosArrayGetP(pDataBlocks, i))->info; const SDataBlockInfo* pBlkInfo = &pDataBlock->info;
int32_t numOfCols = taosArrayGetSize(pDataBlocks); int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock);
bufSize += pBlkInfo->rows * (TD_ROW_HEAD_LEN + pBlkInfo->rowSize + BitmapLen(numOfCols)); bufSize += pBlkInfo->rows * (TD_ROW_HEAD_LEN + pBlkInfo->rowSize + BitmapLen(colNum));
bufSize += sizeof(SSubmitBlk); bufSize += sizeof(SSubmitBlk);
} }
@ -1900,7 +1900,6 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks
tdSRowInit(&rb, pTSchema->version); tdSRowInit(&rb, pTSchema->version);
for (int32_t i = 0; i < sz; ++i) { for (int32_t i = 0; i < sz; ++i) {
SSDataBlock* pDataBlock = taosArrayGetP(pDataBlocks, i);
int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock); int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock);
int32_t rows = pDataBlock->info.rows; int32_t rows = pDataBlock->info.rows;
// int32_t rowSize = pDataBlock->info.rowSize; // int32_t rowSize = pDataBlock->info.rowSize;

View File

@ -615,7 +615,7 @@ static int32_t tdRSmaFetchAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSm
while (1) { while (1) {
uint64_t ts; uint64_t ts;
int32_t code = qExecTaskOpt(taskInfo, pResList, &ts); int32_t code = qExecTaskOpt(taskInfo, pResList, &ts);
if (code < 0) { if (code < 0) {
if (code == TSDB_CODE_QRY_IN_EXEC) { if (code == TSDB_CODE_QRY_IN_EXEC) {
break; break;
} else { } else {
@ -627,43 +627,46 @@ static int32_t tdRSmaFetchAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSm
if (taosArrayGetSize(pResList) == 0) { if (taosArrayGetSize(pResList) == 0) {
if (terrno == 0) { if (terrno == 0) {
smaDebug("vgId:%d, no rsma %" PRIi8 " data fetched yet", SMA_VID(pSma), pItem->level); // smaDebug("vgId:%d, no rsma %" PRIi8 " data fetched yet", SMA_VID(pSma), pItem->level);
} else { } else {
smaDebug("vgId:%d, no rsma %" PRIi8 " data fetched since %s", SMA_VID(pSma), pItem->level, terrstr()); smaDebug("vgId:%d, no rsma %" PRIi8 " data fetched since %s", SMA_VID(pSma), pItem->level, terrstr());
goto _err; goto _err;
} }
break; break;
} else {
smaDebug("vgId:%d, rsma %" PRIi8 " data fetched", SMA_VID(pSma), pItem->level);
} }
#if 0 #if 1
char flag[10] = {0}; char flag[10] = {0};
snprintf(flag, 10, "level %" PRIi8, pItem->level); snprintf(flag, 10, "level %" PRIi8, pItem->level);
blockDebugShowDataBlocks(pResList, flag); blockDebugShowDataBlocks(pResList, flag);
#endif #endif
STsdb *sinkTsdb = (pItem->level == TSDB_RETENTION_L1 ? pSma->pRSmaTsdb[0] : pSma->pRSmaTsdb[1]); for (int32_t i = 0; i < taosArrayGetSize(pResList); ++i) {
SSubmitReq *pReq = NULL; SSDataBlock *output = taosArrayGetP(pResList, i);
STsdb *sinkTsdb = (pItem->level == TSDB_RETENTION_L1 ? pSma->pRSmaTsdb[0] : pSma->pRSmaTsdb[1]);
SSubmitReq *pReq = NULL;
// TODO: the schema update should be handled later(TD-17965) // TODO: the schema update should be handled later(TD-17965)
if (buildSubmitReqFromDataBlock(&pReq, pResList, pTSchema, SMA_VID(pSma), suid) < 0) { if (buildSubmitReqFromDataBlock(&pReq, output, pTSchema, SMA_VID(pSma), suid) < 0) {
smaError("vgId:%d, build submit req for rsma stable %" PRIi64 " level %" PRIi8 " failed since %s", SMA_VID(pSma), smaError("vgId:%d, build submit req for rsma stable %" PRIi64 " level %" PRIi8 " failed since %s",
suid, pItem->level, terrstr()); SMA_VID(pSma), suid, pItem->level, terrstr());
goto _err; goto _err;
} }
SSDataBlock *pBlk = (SSDataBlock *)taosArrayGet(pResList, 0); if (pReq && tdProcessSubmitReq(sinkTsdb, output->info.version, pReq) < 0) {
if (pReq && tdProcessSubmitReq(sinkTsdb, pBlk->info.version, pReq) < 0) { taosMemoryFreeClear(pReq);
smaError("vgId:%d, process submit req for rsma stable %" PRIi64 " level %" PRIi8 " failed since %s",
SMA_VID(pSma), suid, pItem->level, terrstr());
goto _err;
}
taosMemoryFreeClear(pReq); taosMemoryFreeClear(pReq);
smaError("vgId:%d, process submit req for rsma stable %" PRIi64 " level %" PRIi8 " version %" PRIi64
" failed since %s", smaDebug("vgId:%d, process submit req for rsma table %" PRIi64 " level %" PRIi8 " version:%" PRIi64,
SMA_VID(pSma), suid, pItem->level, terrstr()); SMA_VID(pSma), suid, pItem->level, output->info.version);
goto _err;
} }
smaDebug("vgId:%d, process submit req for rsma table %" PRIi64 " level %" PRIi8 " version %" PRIi64, SMA_VID(pSma),
suid, pItem->level);
taosMemoryFreeClear(pReq);
} }
taosArrayDestroy(pResList); taosArrayDestroy(pResList);
@ -1385,7 +1388,7 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) {
} }
_end: _end:
taosTmrReset(tdRSmaFetchTrigger, pItem->maxDelay, pItem, smaMgmt.tmrHandle, &pItem->tmrId); // taosTmrReset(tdRSmaFetchTrigger, pItem->maxDelay, pItem, smaMgmt.tmrHandle, &pItem->tmrId);
tdReleaseSmaRef(smaMgmt.rsetId, pRSmaInfo->refId); tdReleaseSmaRef(smaMgmt.rsetId, pRSmaInfo->refId);
} }