fix(stream): add task status check when waiting for the table create.

This commit is contained in:
Haojun Liao 2023-09-06 10:58:33 +08:00
parent 0441c7a870
commit c86eeb3938
7 changed files with 24 additions and 14 deletions

View File

@ -250,7 +250,7 @@ int32_t tqProcessTaskDropReq(STQ* pTq, int64_t version, char* msg, int32_t msgLe
int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen); int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen); int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t version, SRpcMsg* pMsg); int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec); int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec);
int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg);

View File

@ -890,9 +890,10 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
return streamSendCheckRsp(pTq->pStreamMeta, &req, &rsp, &pMsg->info, taskId); return streamSendCheckRsp(pTq->pStreamMeta, &req, &rsp, &pMsg->info, taskId);
} }
int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t sversion, SRpcMsg* pMsg) { int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, SRpcMsg* pMsg) {
char* pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); char* pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t len = pMsg->contLen - sizeof(SMsgHead); int32_t len = pMsg->contLen - sizeof(SMsgHead);
int32_t vgId = pTq->pStreamMeta->vgId;
int32_t code; int32_t code;
SStreamTaskCheckRsp rsp; SStreamTaskCheckRsp rsp;
@ -901,7 +902,9 @@ int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t sversion, SRpcMsg* pMsg) {
tDecoderInit(&decoder, (uint8_t*)pReq, len); tDecoderInit(&decoder, (uint8_t*)pReq, len);
code = tDecodeStreamTaskCheckRsp(&decoder, &rsp); code = tDecodeStreamTaskCheckRsp(&decoder, &rsp);
if (code < 0) { if (code < 0) {
terrno = TSDB_CODE_INVALID_MSG;
tDecoderClear(&decoder); tDecoderClear(&decoder);
tqError("vgId:%d failed to parse check rsp msg, code:%s", vgId, tstrerror(terrno));
return -1; return -1;
} }

View File

@ -487,10 +487,18 @@ int32_t doSinkResultBlock(SVnode* pVnode, int32_t blockIndex, char* stbFullName,
pTableSinkInfo->uid = mr.me.uid; pTableSinkInfo->uid = mr.me.uid;
metaReaderClear(&mr); metaReaderClear(&mr);
} }
} else { // not exist, wait and retry } else { // not exist, wait and retry
metaReaderClear(&mr); metaReaderClear(&mr);
taosMsleep(100); if (streamTaskShouldStop(&pTask->status)) {
tqDebug("s-task:%s wait for the table:%s ready before insert data", id, dstTableName); tqDebug("s-task:%s task will stop, quit from waiting for table:%s create", id, dstTableName);
taosArrayDestroy(tbData.aRowP);
taosArrayDestroy(pVals);
return TSDB_CODE_SUCCESS;
} else {
taosMsleep(100);
tqDebug("s-task:%s wait 100ms for the table:%s ready before insert data", id, dstTableName);
}
} }
} }

View File

@ -112,7 +112,7 @@ int32_t tqCheckAndRunStreamTaskAsync(STQ* pTq) {
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
if (numOfTasks == 0) { if (numOfTasks == 0) {
tqInfo("vgId:%d no stream tasks exist", vgId); tqDebug("vgId:%d no stream tasks existed to run", vgId);
taosWUnLockLatch(&pMeta->lock); taosWUnLockLatch(&pMeta->lock);
return 0; return 0;
} }
@ -150,7 +150,7 @@ int32_t tqScanWalAsync(STQ* pTq, bool ckPause) {
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
if (numOfTasks == 0) { if (numOfTasks == 0) {
tqInfo("vgId:%d no stream tasks exist", vgId); tqDebug("vgId:%d no stream tasks existed to run", vgId);
taosWUnLockLatch(&pMeta->lock); taosWUnLockLatch(&pMeta->lock);
return 0; return 0;
} }

View File

@ -756,7 +756,7 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo)
case TDMT_VND_STREAM_TASK_CHECK: case TDMT_VND_STREAM_TASK_CHECK:
return tqProcessStreamTaskCheckReq(pVnode->pTq, pMsg); return tqProcessStreamTaskCheckReq(pVnode->pTq, pMsg);
case TDMT_VND_STREAM_TASK_CHECK_RSP: case TDMT_VND_STREAM_TASK_CHECK_RSP:
return tqProcessStreamTaskCheckRsp(pVnode->pTq, 0, pMsg); return tqProcessStreamTaskCheckRsp(pVnode->pTq, pMsg);
case TDMT_STREAM_RETRIEVE: case TDMT_STREAM_RETRIEVE:
return tqProcessTaskRetrieveReq(pVnode->pTq, pMsg); return tqProcessTaskRetrieveReq(pVnode->pTq, pMsg);
case TDMT_STREAM_RETRIEVE_RSP: case TDMT_STREAM_RETRIEVE_RSP:

View File

@ -1591,7 +1591,6 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t
return 0; return 0;
} }
int streamStateOpenBackend(void* backend, SStreamState* pState) { int streamStateOpenBackend(void* backend, SStreamState* pState) {
// qInfo("start to open state %p on backend %p 0x%" PRIx64 "-%d", pState, backend, pState->streamId, pState->taskId);
taosAcquireRef(streamBackendId, pState->streamBackendRid); taosAcquireRef(streamBackendId, pState->streamBackendRid);
SBackendWrapper* handle = backend; SBackendWrapper* handle = backend;
SBackendCfWrapper* pBackendCfWrapper = taosMemoryCalloc(1, sizeof(SBackendCfWrapper)); SBackendCfWrapper* pBackendCfWrapper = taosMemoryCalloc(1, sizeof(SBackendCfWrapper));

View File

@ -264,9 +264,9 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem)
if (type == STREAM_INPUT__DATA_SUBMIT) { if (type == STREAM_INPUT__DATA_SUBMIT) {
SStreamDataSubmit* px = (SStreamDataSubmit*)pItem; SStreamDataSubmit* px = (SStreamDataSubmit*)pItem;
if ((pTask->info.taskLevel == TASK_LEVEL__SOURCE) && streamQueueIsFull(pQueue)) { if ((pTask->info.taskLevel == TASK_LEVEL__SOURCE) && streamQueueIsFull(pQueue)) {
qError( // qError(
"s-task:%s inputQ is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) stop to push data", // "s-task:%s inputQ is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) stop to push data",
pTask->id.idStr, STREAM_TASK_INPUT_QUEUE_CAPACITY, STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE, total, size); // pTask->id.idStr, STREAM_TASK_INPUT_QUEUE_CAPACITY, STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE, total, size);
streamDataSubmitDestroy(px); streamDataSubmitDestroy(px);
taosFreeQitem(pItem); taosFreeQitem(pItem);
return -1; return -1;
@ -288,8 +288,8 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem)
} else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE || } else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE ||
type == STREAM_INPUT__REF_DATA_BLOCK) { type == STREAM_INPUT__REF_DATA_BLOCK) {
if (streamQueueIsFull(pQueue)) { if (streamQueueIsFull(pQueue)) {
qError("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort", // qError("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort",
pTask->id.idStr, STREAM_TASK_INPUT_QUEUE_CAPACITY, STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE, total, size); // pTask->id.idStr, STREAM_TASK_INPUT_QUEUE_CAPACITY, STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE, total, size);
destroyStreamDataBlock((SStreamDataBlock*)pItem); destroyStreamDataBlock((SStreamDataBlock*)pItem);
return -1; return -1;
} }