refactor: check if the put data into queue is success or failed.

This commit is contained in:
Haojun Liao 2023-09-06 19:39:00 +08:00
parent 8edf86b3bb
commit 2b84e0b02e
1 changed files with 4 additions and 3 deletions

View File

@ -24,7 +24,7 @@ typedef struct STableSinkInfo {
tstr name;
} STableSinkInfo;
static int32_t doSinkResultBlock(SVnode* pVnode, int32_t blockIndex, char* stbFullName, int64_t suid,
static int32_t doBuildSubmitFromResBlock(SVnode* pVnode, int32_t blockIndex, char* stbFullName, int64_t suid,
SSDataBlock* pDataBlock, SStreamTask* pTask, SSubmitTbData* pTableData);
static int32_t doSinkDeleteBlock(SVnode* pVnode, char* stbFullName, SSDataBlock* pDataBlock, SStreamTask* pTask,
int64_t suid);
@ -275,7 +275,7 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, void* data) {
pTask->sinkRecorder.numOfBlocks += 1;
SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version};
code = doSinkResultBlock(pVnode, i, stbFullName, suid, pDataBlock, pTask, &tbData);
code = doBuildSubmitFromResBlock(pVnode, i, stbFullName, suid, pDataBlock, pTask, &tbData);
taosArrayPush(submitReq.aSubmitTbData, &tbData);
pTask->sinkRecorder.numOfRows += pDataBlock->info.rows;
@ -297,6 +297,7 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, void* data) {
if (code == TSDB_CODE_SUCCESS) {
tqDebug("s-task:%s vgId:%d send submit %d blocks into dstTables completed", id, vgId, numOfBlocks);
} else {
ASSERT(0);
tqError("s-task:%s failed to put into write-queue since %s", id, terrstr());
}
@ -487,7 +488,7 @@ int32_t tqBuildSubmitReq(SSubmitReq2* pSubmitReq, int32_t vgId, void** pMsg, int
return TSDB_CODE_SUCCESS;
}
int32_t doSinkResultBlock(SVnode* pVnode, int32_t blockIndex, char* stbFullName, int64_t suid, SSDataBlock* pDataBlock,
int32_t doBuildSubmitFromResBlock(SVnode* pVnode, int32_t blockIndex, char* stbFullName, int64_t suid, SSDataBlock* pDataBlock,
SStreamTask* pTask, SSubmitTbData* pTableData) {
int32_t numOfRows = pDataBlock->info.rows;
int32_t vgId = TD_VID(pVnode);