fix(stream): add some logs, and remove invalid assert.
This commit is contained in:
parent
124e1ed1ab
commit
27e00b1f19
|
@ -636,7 +636,7 @@ void tDeleteStreamDispatchReq(SStreamDispatchReq* pReq);
|
||||||
int32_t streamSetupScheduleTrigger(SStreamTask* pTask);
|
int32_t streamSetupScheduleTrigger(SStreamTask* pTask);
|
||||||
|
|
||||||
int32_t streamProcessRunReq(SStreamTask* pTask);
|
int32_t streamProcessRunReq(SStreamTask* pTask);
|
||||||
int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pMsg, bool exec);
|
int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pMsg);
|
||||||
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code);
|
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code);
|
||||||
|
|
||||||
int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pMsg);
|
int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pMsg);
|
||||||
|
|
|
@ -158,8 +158,8 @@ static const SSysDbTableSchema streamSchema[] = {
|
||||||
|
|
||||||
static const SSysDbTableSchema streamTaskSchema[] = {
|
static const SSysDbTableSchema streamTaskSchema[] = {
|
||||||
{.name = "stream_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
{.name = "stream_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
||||||
{.name = "task_id", .bytes = 32, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
{.name = "task_id", .bytes = 16 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
||||||
{.name = "node_type", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
{.name = "node_type", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
||||||
{.name = "node_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false},
|
{.name = "node_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false},
|
||||||
{.name = "level", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
{.name = "level", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
||||||
{.name = "status", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
{.name = "status", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
||||||
|
|
|
@ -41,7 +41,7 @@ void sndEnqueueStreamDispatch(SSnode *pSnode, SRpcMsg *pMsg) {
|
||||||
.info = pMsg->info,
|
.info = pMsg->info,
|
||||||
.code = 0,
|
.code = 0,
|
||||||
};
|
};
|
||||||
streamProcessDispatchMsg(pTask, &req, &rsp, false);
|
streamProcessDispatchMsg(pTask, &req, &rsp);
|
||||||
streamMetaReleaseTask(pSnode->pMeta, pTask);
|
streamMetaReleaseTask(pSnode->pMeta, pTask);
|
||||||
rpcFreeCont(pMsg->pCont);
|
rpcFreeCont(pMsg->pCont);
|
||||||
taosFreeQitem(pMsg);
|
taosFreeQitem(pMsg);
|
||||||
|
@ -228,7 +228,7 @@ int32_t sndProcessTaskDispatchReq(SSnode *pSnode, SRpcMsg *pMsg, bool exec) {
|
||||||
SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, req.streamId, req.taskId);
|
SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, req.streamId, req.taskId);
|
||||||
if (pTask) {
|
if (pTask) {
|
||||||
SRpcMsg rsp = {.info = pMsg->info, .code = 0};
|
SRpcMsg rsp = {.info = pMsg->info, .code = 0};
|
||||||
streamProcessDispatchMsg(pTask, &req, &rsp, exec);
|
streamProcessDispatchMsg(pTask, &req, &rsp);
|
||||||
streamMetaReleaseTask(pSnode->pMeta, pTask);
|
streamMetaReleaseTask(pSnode->pMeta, pTask);
|
||||||
return 0;
|
return 0;
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -1353,7 +1353,7 @@ int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec) {
|
||||||
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.streamId, req.taskId);
|
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.streamId, req.taskId);
|
||||||
if (pTask) {
|
if (pTask) {
|
||||||
SRpcMsg rsp = {.info = pMsg->info, .code = 0};
|
SRpcMsg rsp = {.info = pMsg->info, .code = 0};
|
||||||
streamProcessDispatchMsg(pTask, &req, &rsp, exec);
|
streamProcessDispatchMsg(pTask, &req, &rsp);
|
||||||
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
|
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
|
||||||
return 0;
|
return 0;
|
||||||
} else {
|
} else {
|
||||||
|
@ -1563,7 +1563,7 @@ int32_t vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) {
|
||||||
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.streamId, taskId);
|
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.streamId, taskId);
|
||||||
if (pTask != NULL) {
|
if (pTask != NULL) {
|
||||||
SRpcMsg rsp = {.info = pMsg->info, .code = 0};
|
SRpcMsg rsp = {.info = pMsg->info, .code = 0};
|
||||||
streamProcessDispatchMsg(pTask, &req, &rsp, false);
|
streamProcessDispatchMsg(pTask, &req, &rsp);
|
||||||
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
|
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
|
||||||
rpcFreeCont(pMsg->pCont);
|
rpcFreeCont(pMsg->pCont);
|
||||||
taosFreeQitem(pMsg);
|
taosFreeQitem(pMsg);
|
||||||
|
|
|
@ -210,7 +210,7 @@ int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq,
|
||||||
return status == TASK_INPUT_STATUS__NORMAL ? 0 : -1;
|
return status == TASK_INPUT_STATUS__NORMAL ? 0 : -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp, bool exec) {
|
int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp) {
|
||||||
stDebug("s-task:%s receive dispatch msg from taskId:0x%x(vgId:%d), msgLen:%" PRId64, pTask->id.idStr,
|
stDebug("s-task:%s receive dispatch msg from taskId:0x%x(vgId:%d), msgLen:%" PRId64, pTask->id.idStr,
|
||||||
pReq->upstreamTaskId, pReq->upstreamNodeId, pReq->totalLen);
|
pReq->upstreamTaskId, pReq->upstreamNodeId, pReq->totalLen);
|
||||||
int32_t status = 0;
|
int32_t status = 0;
|
||||||
|
@ -219,7 +219,7 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S
|
||||||
ASSERT(pInfo != NULL);
|
ASSERT(pInfo != NULL);
|
||||||
|
|
||||||
if (!pTask->pMeta->leader) {
|
if (!pTask->pMeta->leader) {
|
||||||
ASSERT(0);
|
stError("s-task:%s task on follower received dispatch msgs, should discard it, not now", pTask->id.idStr);
|
||||||
}
|
}
|
||||||
|
|
||||||
// upstream task has restarted/leader-follower switch/transferred to other dnodes
|
// upstream task has restarted/leader-follower switch/transferred to other dnodes
|
||||||
|
|
Loading…
Reference in New Issue