enh(stream): pack multi-datablock into one submit message.

This commit is contained in:
Haojun Liao 2023-09-06 18:58:47 +08:00
parent 926cc01cff
commit a82027f492
1 changed files with 82 additions and 73 deletions

View File

@ -25,9 +25,10 @@ typedef struct STableSinkInfo {
} STableSinkInfo; } STableSinkInfo;
static int32_t doSinkResultBlock(SVnode* pVnode, int32_t blockIndex, char* stbFullName, int64_t suid, static int32_t doSinkResultBlock(SVnode* pVnode, int32_t blockIndex, char* stbFullName, int64_t suid,
SSDataBlock* pDataBlock, SStreamTask* pTask); SSDataBlock* pDataBlock, SStreamTask* pTask, SSubmitTbData* pTableData);
static int32_t doSinkDeleteBlock(SVnode* pVnode, char* stbFullName, SSDataBlock* pDataBlock, SStreamTask* pTask, static int32_t doSinkDeleteBlock(SVnode* pVnode, char* stbFullName, SSDataBlock* pDataBlock, SStreamTask* pTask,
int64_t suid); int64_t suid);
static int32_t tqBuildSubmitReq(SSubmitReq2* pSubmitReq, int32_t vgId, void** pMsg, int32_t* msgLen);
int32_t tqBuildDeleteReq(const char* stbFullName, const SSDataBlock* pDataBlock, SBatchDeleteReq* deleteReq, int32_t tqBuildDeleteReq(const char* stbFullName, const SSDataBlock* pDataBlock, SBatchDeleteReq* deleteReq,
const char* pIdStr) { const char* pIdStr) {
@ -142,17 +143,25 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, void* data) {
int32_t vgId = TD_VID(pVnode); int32_t vgId = TD_VID(pVnode);
int32_t numOfBlocks = taosArrayGetSize(pBlocks); int32_t numOfBlocks = taosArrayGetSize(pBlocks);
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
const char* id = pTask->id.idStr;
if (pTask->tsInfo.sinkStart == 0) { if (pTask->tsInfo.sinkStart == 0) {
pTask->tsInfo.sinkStart = taosGetTimestampMs(); pTask->tsInfo.sinkStart = taosGetTimestampMs();
} }
tqInfo("vgId:%d, s-task:%s write %d stream resBlock(s) into table", vgId, pTask->id.idStr, numOfBlocks); tqDebug("vgId:%d, s-task:%s write %d stream resBlock(s) into table", vgId, id, numOfBlocks);
bool hasSubmit = false;
SArray* tagArray = NULL; SArray* tagArray = NULL;
SArray* pVals = NULL; SArray* pVals = NULL;
SArray* crTblArray = NULL; SArray* crTblArray = NULL;
SSubmitReq2 submitReq = {.aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData))};
if (submitReq.aSubmitTbData == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _end;
}
for (int32_t i = 0; i < numOfBlocks; i++) { for (int32_t i = 0; i < numOfBlocks; i++) {
SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i); SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
int32_t rows = pDataBlock->info.rows; int32_t rows = pDataBlock->info.rows;
@ -262,19 +271,46 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, void* data) {
} else if (pDataBlock->info.type == STREAM_CHECKPOINT) { } else if (pDataBlock->info.type == STREAM_CHECKPOINT) {
continue; continue;
} else { } else {
hasSubmit = true;
pTask->sinkRecorder.numOfPackages += 1; pTask->sinkRecorder.numOfPackages += 1;
code = doSinkResultBlock(pVnode, i, stbFullName, suid, pDataBlock, pTask);
SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version};
code = doSinkResultBlock(pVnode, i, stbFullName, suid, pDataBlock, pTask, &tbData);
taosArrayPush(submitReq.aSubmitTbData, &tbData);
pTask->sinkRecorder.numOfRows += pDataBlock->info.rows;
} }
} }
if (hasSubmit) {
int32_t len = 0;
void* pBuf = NULL;
code = tqBuildSubmitReq(&submitReq, vgId, &pBuf, &len);
if (code != TSDB_CODE_SUCCESS) {
tqError("s-task:%s build submit msg failed, vgId:%d, code:%s", id, vgId, tstrerror(code));
goto _end;
}
SRpcMsg msg = {.msgType = TDMT_VND_SUBMIT, .pCont = pBuf, .contLen = len};
code = tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg);
if (code == TSDB_CODE_SUCCESS) {
tqDebug("s-task:%s vgId:%d send submit %d blocks(%d rows) into dstTables completed", id, vgId);
} else {
tqError("s-task:%s failed to put into write-queue since %s", id, terrstr());
}
if ((pTask->sinkRecorder.numOfPackages % 5000) == 0) { if ((pTask->sinkRecorder.numOfPackages % 5000) == 0) {
SSinkTaskRecorder* pRec = &pTask->sinkRecorder; SSinkTaskRecorder* pRec = &pTask->sinkRecorder;
tqInfo("s-task:%s vgId:%d write %" PRId64 " blocks (%" PRId64 " rows) into dst table, duration:%.2fSec.", tqInfo("s-task:%s vgId:%d write %" PRId64 " blocks (%" PRId64 " rows) into dst table, duration:%.2fSec.",
pTask->id.idStr, vgId, pRec->numOfPackages, pRec->numOfRows, pTask->id.idStr, vgId, pRec->numOfPackages, pRec->numOfRows,
(taosGetTimestampMs() - pTask->tsInfo.sinkStart) / 1000.0); (taosGetTimestampMs() - pTask->tsInfo.sinkStart) / 1000.0);
} }
} else {
tqDebug("vgId:%d, s-task:%s write results completed", vgId, pTask->id.idStr); tDestroySubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE);
tqDebug("vgId:%d, s-task:%s write results completed", vgId, id);
}
_end: _end:
taosArrayDestroy(tagArray); taosArrayDestroy(tagArray);
@ -409,30 +445,21 @@ static int32_t doPutIntoCache(SSHashObj* pSinkTableMap, STableSinkInfo* pTableSi
return code; return code;
} }
static int32_t tqBuildSubmitReq(SSubmitTbData* pTableData, int32_t vgId, void** pMsg, int32_t* msgLen) { int32_t tqBuildSubmitReq(SSubmitReq2* pSubmitReq, int32_t vgId, void** pMsg, int32_t* msgLen) {
int32_t code = 0; int32_t code = 0;
void* pBuf = NULL; void* pBuf = NULL;
*msgLen = 0; *msgLen = 0;
SSubmitReq2 submitReq = {.aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData))};
if (submitReq.aSubmitTbData == NULL) {
tDestroySubmitTbData(pTableData, TSDB_MSG_FLG_ENCODE);
taosArrayDestroy(pTableData->aRowP);
return TSDB_CODE_OUT_OF_MEMORY;
}
taosArrayPush(submitReq.aSubmitTbData, pTableData);
// encode // encode
int32_t len = 0; int32_t len = 0;
tEncodeSize(tEncodeSubmitReq, &submitReq, len, code); tEncodeSize(tEncodeSubmitReq, pSubmitReq, len, code);
SEncoder encoder; SEncoder encoder;
len += sizeof(SSubmitReq2Msg); len += sizeof(SSubmitReq2Msg);
pBuf = rpcMallocCont(len); pBuf = rpcMallocCont(len);
if (NULL == pBuf) { if (NULL == pBuf) {
tDestroySubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE); tDestroySubmitReq(pSubmitReq, TSDB_MSG_FLG_ENCODE);
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
@ -441,17 +468,17 @@ static int32_t tqBuildSubmitReq(SSubmitTbData* pTableData, int32_t vgId, void**
((SSubmitReq2Msg*)pBuf)->version = htobe64(1); ((SSubmitReq2Msg*)pBuf)->version = htobe64(1);
tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SSubmitReq2Msg)), len - sizeof(SSubmitReq2Msg)); tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SSubmitReq2Msg)), len - sizeof(SSubmitReq2Msg));
if (tEncodeSubmitReq(&encoder, &submitReq) < 0) { if (tEncodeSubmitReq(&encoder, pSubmitReq) < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
tqError("failed to encode submit req, code:%s, ignore and continue", terrstr()); tqError("failed to encode submit req, code:%s, ignore and continue", terrstr());
tEncoderClear(&encoder); tEncoderClear(&encoder);
rpcFreeCont(pBuf); rpcFreeCont(pBuf);
tDestroySubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE); tDestroySubmitReq(pSubmitReq, TSDB_MSG_FLG_ENCODE);
return code; return code;
} }
tEncoderClear(&encoder); tEncoderClear(&encoder);
tDestroySubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE); tDestroySubmitReq(pSubmitReq, TSDB_MSG_FLG_ENCODE);
*msgLen = len; *msgLen = len;
*pMsg = pBuf; *pMsg = pBuf;
@ -459,7 +486,7 @@ static int32_t tqBuildSubmitReq(SSubmitTbData* pTableData, int32_t vgId, void**
} }
int32_t doSinkResultBlock(SVnode* pVnode, int32_t blockIndex, char* stbFullName, int64_t suid, SSDataBlock* pDataBlock, int32_t doSinkResultBlock(SVnode* pVnode, int32_t blockIndex, char* stbFullName, int64_t suid, SSDataBlock* pDataBlock,
SStreamTask* pTask) { SStreamTask* pTask, SSubmitTbData* pTableData) {
int32_t numOfRows = pDataBlock->info.rows; int32_t numOfRows = pDataBlock->info.rows;
int32_t vgId = TD_VID(pVnode); int32_t vgId = TD_VID(pVnode);
uint64_t groupId = pDataBlock->info.id.groupId; uint64_t groupId = pDataBlock->info.id.groupId;
@ -468,15 +495,14 @@ int32_t doSinkResultBlock(SVnode* pVnode, int32_t blockIndex, char* stbFullName,
SArray* pVals = NULL; SArray* pVals = NULL;
const char* id = pTask->id.idStr; const char* id = pTask->id.idStr;
SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version}; tqDebug("s-task:%s sink data pipeline, build submit msg from %dth resBlock, including %d rows, dst suid:%" PRId64,
tqDebug("s-task:%s sink data pipeline, build submit msg from %d-th resBlock, including %d rows, dst suid:%" PRId64,
id, blockIndex + 1, numOfRows, suid); id, blockIndex + 1, numOfRows, suid);
tbData.aRowP = taosArrayInit(numOfRows, sizeof(SRow*)); pTableData->aRowP = taosArrayInit(numOfRows, sizeof(SRow*));
pVals = taosArrayInit(pTSchema->numOfCols, sizeof(SColVal)); pVals = taosArrayInit(pTSchema->numOfCols, sizeof(SColVal));
if (tbData.aRowP == NULL || pVals == NULL) { if (pTableData->aRowP == NULL || pVals == NULL) {
taosArrayDestroy(tbData.aRowP); taosArrayDestroy(pTableData->aRowP);
taosArrayDestroy(pVals); taosArrayDestroy(pVals);
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
@ -517,13 +543,21 @@ int32_t doSinkResultBlock(SVnode* pVnode, int32_t blockIndex, char* stbFullName,
} }
if (exist) { if (exist) {
tbData.uid = pTableSinkInfo->uid; pTableData->uid = pTableSinkInfo->uid;
if (tbData.uid == 0) { if (pTableData->uid == 0) {
tqDebug("s-task:%s cached tableInfo uid is invalid, acquire it from meta", id); tqDebug("s-task:%s cached tableInfo uid is invalid, acquire it from meta", id);
} }
while (pTableSinkInfo->uid == 0) { while (pTableSinkInfo->uid == 0) {
if (streamTaskShouldStop(&pTask->status)) {
tqDebug("s-task:%s task will stop, quit from waiting for table:%s create", id, dstTableName);
taosArrayDestroy(pTableData->aRowP);
taosArrayDestroy(pVals);
return TSDB_CODE_SUCCESS;
}
// wait for the table to be created // wait for the table to be created
SMetaReader mr = {0}; SMetaReader mr = {0};
metaReaderDoInit(&mr, pVnode->pMeta, 0); metaReaderDoInit(&mr, pVnode->pMeta, 0);
@ -534,31 +568,23 @@ int32_t doSinkResultBlock(SVnode* pVnode, int32_t blockIndex, char* stbFullName,
if (!isValid) { // not valid table, ignore it if (!isValid) { // not valid table, ignore it
metaReaderClear(&mr); metaReaderClear(&mr);
taosArrayDestroy(tbData.aRowP); taosArrayDestroy(pTableData->aRowP);
taosArrayDestroy(pVals); taosArrayDestroy(pVals);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} else { } else {
tqDebug("s-task:%s set uid:%"PRIu64" for dstTable:%s from meta", id, mr.me.uid, pTableSinkInfo->name.data); tqDebug("s-task:%s set uid:%"PRIu64" for dstTable:%s from meta", id, mr.me.uid, pTableSinkInfo->name.data);
tbData.uid = mr.me.uid; pTableData->uid = mr.me.uid;
pTableSinkInfo->uid = mr.me.uid; pTableSinkInfo->uid = mr.me.uid;
metaReaderClear(&mr); metaReaderClear(&mr);
} }
} else { // not exist, wait and retry } else { // not exist, wait and retry
metaReaderClear(&mr); metaReaderClear(&mr);
if (streamTaskShouldStop(&pTask->status)) {
tqDebug("s-task:%s task will stop, quit from waiting for table:%s create", id, dstTableName);
taosArrayDestroy(tbData.aRowP);
taosArrayDestroy(pVals);
return TSDB_CODE_SUCCESS;
} else {
taosMsleep(100); taosMsleep(100);
tqDebug("s-task:%s wait 100ms for the table:%s ready before insert data", id, dstTableName); tqDebug("s-task:%s wait 100ms for the table:%s ready before insert data", id, dstTableName);
} }
} }
}
} else { } else {
// todo: this check is not safe, and results in losing of submit message from WAL. // todo: this check is not safe, and results in losing of submit message from WAL.
@ -576,12 +602,12 @@ int32_t doSinkResultBlock(SVnode* pVnode, int32_t blockIndex, char* stbFullName,
tqDebug("s-task:%s stream write into table:%s, table auto created", id, dstTableName); tqDebug("s-task:%s stream write into table:%s, table auto created", id, dstTableName);
tbData.flags = SUBMIT_REQ_AUTO_CREATE_TABLE; pTableData->flags = SUBMIT_REQ_AUTO_CREATE_TABLE;
tbData.pCreateTbReq = buildAutoCreateTableReq(stbFullName, suid, pTSchema->numOfCols + 1, pDataBlock); pTableData->pCreateTbReq = buildAutoCreateTableReq(stbFullName, suid, pTSchema->numOfCols + 1, pDataBlock);
if (tbData.pCreateTbReq == NULL) { if (pTableData->pCreateTbReq == NULL) {
tqError("s-task:%s failed to build auto create table req, code:%s", id, tstrerror(terrno)); tqError("s-task:%s failed to build auto create table req, code:%s", id, tstrerror(terrno));
taosArrayDestroy(tbData.aRowP); taosArrayDestroy(pTableData->aRowP);
taosArrayDestroy(pVals); taosArrayDestroy(pVals);
return terrno; return terrno;
@ -593,14 +619,14 @@ int32_t doSinkResultBlock(SVnode* pVnode, int32_t blockIndex, char* stbFullName,
if (!isValid) { if (!isValid) {
metaReaderClear(&mr); metaReaderClear(&mr);
taosMemoryFree(pTableSinkInfo); taosMemoryFree(pTableSinkInfo);
taosArrayDestroy(tbData.aRowP); taosArrayDestroy(pTableData->aRowP);
taosArrayDestroy(pVals); taosArrayDestroy(pVals);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} else { } else {
tbData.uid = mr.me.uid; pTableData->uid = mr.me.uid;
metaReaderClear(&mr); metaReaderClear(&mr);
doPutIntoCache(pTask->tbSink.pTblInfo, pTableSinkInfo, groupId, tbData.uid, id); doPutIntoCache(pTask->tbSink.pTblInfo, pTableSinkInfo, groupId, pTableData->uid, id);
} }
} }
} }
@ -648,35 +674,18 @@ int32_t doSinkResultBlock(SVnode* pVnode, int32_t blockIndex, char* stbFullName,
SRow* pRow = NULL; SRow* pRow = NULL;
code = tRowBuild(pVals, (STSchema*)pTSchema, &pRow); code = tRowBuild(pVals, (STSchema*)pTSchema, &pRow);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE); tDestroySubmitTbData(pTableData, TSDB_MSG_FLG_ENCODE);
taosArrayDestroy(tbData.aRowP); taosArrayDestroy(pTableData->aRowP);
taosArrayDestroy(pVals); taosArrayDestroy(pVals);
return code; return code;
} }
ASSERT(pRow); ASSERT(pRow);
taosArrayPush(tbData.aRowP, &pRow); taosArrayPush(pTableData->aRowP, &pRow);
} }
int32_t len = 0; tqDebug("s-task:%s build submit msg for dstTable:%s, numOfRows:%d", id, dstTableName, numOfRows);
void* pBuf = NULL;
code = tqBuildSubmitReq(&tbData, vgId, &pBuf, &len);
if (code != TSDB_CODE_SUCCESS) {
taosArrayDestroy(pVals);
return code;
}
pTask->sinkRecorder.numOfRows += numOfRows;
SRpcMsg msg = {.msgType = TDMT_VND_SUBMIT, .pCont = pBuf, .contLen = len};
code = tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg);
if (code == TSDB_CODE_SUCCESS) {
tqDebug("s-task:%s send submit msg to dstTable:%s, numOfRows:%d", id, dstTableName, numOfRows);
} else {
tqError("s-task:%s failed to put into write-queue since %s", id, terrstr());
}
taosArrayDestroy(pVals); taosArrayDestroy(pVals);
return code; return code;