Merge pull request #26976 from taosdata/fix/syntax
fix(tsdb): return error.
This commit is contained in:
commit
d6f8d79223
|
@ -305,8 +305,12 @@ int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId) {
|
|||
if (pCheck->ntbUid == tbUid) {
|
||||
int32_t sz = taosArrayGetSize(pCheck->colIdList);
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
int16_t forbidColId = *(int16_t*)taosArrayGet(pCheck->colIdList, i);
|
||||
if (forbidColId == colId) {
|
||||
int16_t* pForbidColId = taosArrayGet(pCheck->colIdList, i);
|
||||
if (pForbidColId == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if ((*pForbidColId) == colId) {
|
||||
taosHashCancelIterate(pTq->pCheckInfo, pIter);
|
||||
return -1;
|
||||
}
|
||||
|
|
|
@ -54,6 +54,10 @@ int32_t tqBuildDeleteReq(STQ* pTq, const char* stbFullName, const SSDataBlock* p
|
|||
SColumnInfoData* pGidCol = taosArrayGet(pDataBlock->pDataBlock, GROUPID_COLUMN_INDEX);
|
||||
SColumnInfoData* pTbNameCol = taosArrayGet(pDataBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX);
|
||||
|
||||
if (pStartTsCol == NULL || pEndTsCol == NULL || pGidCol == NULL || pTbNameCol == NULL) {
|
||||
return terrno;
|
||||
}
|
||||
|
||||
tqDebug("s-task:%s build %d rows delete msg for table:%s", pIdStr, totalRows, stbFullName);
|
||||
|
||||
for (int32_t row = 0; row < totalRows; row++) {
|
||||
|
@ -297,6 +301,9 @@ static int32_t doBuildAndSendCreateTableMsg(SVnode* pVnode, char* stbFullName, S
|
|||
} else {
|
||||
for (int32_t tagId = UD_TAG_COLUMN_INDEX, step = 1; tagId < size; tagId++, step++) {
|
||||
SColumnInfoData* pTagData = taosArrayGet(pDataBlock->pDataBlock, tagId);
|
||||
if (pTagData == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
STagVal tagVal = {.cid = pTSchema->numOfCols + step, .type = pTagData->info.type};
|
||||
void* pData = colDataGetData(pTagData, rowId);
|
||||
|
@ -329,6 +336,9 @@ static int32_t doBuildAndSendCreateTableMsg(SVnode* pVnode, char* stbFullName, S
|
|||
uint64_t gid = pDataBlock->info.id.groupId;
|
||||
if (taosArrayGetSize(pDataBlock->pDataBlock) > UD_GROUPID_COLUMN_INDEX) {
|
||||
SColumnInfoData* pGpIdColInfo = taosArrayGet(pDataBlock->pDataBlock, UD_GROUPID_COLUMN_INDEX);
|
||||
if (pGpIdColInfo == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// todo remove this
|
||||
void* pGpIdData = colDataGetData(pGpIdColInfo, rowId);
|
||||
|
@ -656,6 +666,10 @@ int32_t doConvertRows(SSubmitTbData* pTableData, const STSchema* pTSchema, SSDat
|
|||
// primary timestamp column, for debug purpose
|
||||
if (k == 0) {
|
||||
SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, dataIndex);
|
||||
if (pColData == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
ts = *(int64_t*)colDataGetData(pColData, j);
|
||||
tqTrace("s-task:%s sink row %d, col %d ts %" PRId64, id, j, k, ts);
|
||||
|
||||
|
@ -682,6 +696,10 @@ int32_t doConvertRows(SSubmitTbData* pTableData, const STSchema* pTSchema, SSDat
|
|||
}
|
||||
} else {
|
||||
SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, dataIndex);
|
||||
if (pColData == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (colDataIsNull_s(pColData, j)) {
|
||||
if (pCol->flags & COL_IS_KEY) {
|
||||
qError("ts:%" PRId64 " primary key column should not be null, colId:%" PRIi16 ", colType:%" PRIi8,
|
||||
|
@ -993,6 +1011,10 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) {
|
|||
}
|
||||
|
||||
SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
|
||||
if (pDataBlock == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (pDataBlock->info.type == STREAM_DELETE_RESULT) {
|
||||
code = doBuildAndSendDeleteMsg(pVnode, stbFullName, pDataBlock, pTask, suid);
|
||||
} else if (pDataBlock->info.type == STREAM_CREATE_CHILD_TABLE) {
|
||||
|
@ -1059,6 +1081,10 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) {
|
|||
}
|
||||
|
||||
SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
|
||||
if (pDataBlock == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (pDataBlock->info.type == STREAM_CHECKPOINT) {
|
||||
continue;
|
||||
}
|
||||
|
@ -1110,6 +1136,10 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) {
|
|||
}
|
||||
|
||||
SSubmitTbData* pExisted = taosArrayGet(submitReq.aSubmitTbData, *index);
|
||||
if (pExisted == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
code = doMergeExistedRows(pExisted, &tbData, id);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
continue;
|
||||
|
@ -1137,6 +1167,10 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) {
|
|||
bool hasOnlySubmitData(const SArray* pBlocks, int32_t numOfBlocks) {
|
||||
for (int32_t i = 0; i < numOfBlocks; ++i) {
|
||||
SSDataBlock* p = taosArrayGet(pBlocks, i);
|
||||
if (p == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (p->info.type == STREAM_DELETE_RESULT || p->info.type == STREAM_CREATE_CHILD_TABLE) {
|
||||
return false;
|
||||
}
|
||||
|
|
|
@ -357,9 +357,13 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) {
|
|||
|
||||
for (int32_t i = 0; i < numOfTasks; ++i) {
|
||||
STaskId* pTaskId = taosArrayGet(pTaskList, i);
|
||||
if (pTaskId == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
SStreamTask* pTask = NULL;
|
||||
int32_t code = streamMetaAcquireTask(pStreamMeta, pTaskId->streamId, pTaskId->taskId, &pTask);
|
||||
if (pTask == NULL) {
|
||||
if (pTask == NULL || code != 0) {
|
||||
continue;
|
||||
}
|
||||
|
||||
|
|
|
@ -164,17 +164,19 @@ static void tRowGetPrimaryKeyDeepCopy(SRow* pRow, SRowKey* pKey) {
|
|||
|
||||
static int32_t setColumnIdSlotList(SBlockLoadSuppInfo* pSupInfo, SColumnInfo* pCols, const int32_t* pSlotIdList,
|
||||
int32_t numOfCols) {
|
||||
bool initSucc = true;
|
||||
|
||||
pSupInfo->pk.pk = 0;
|
||||
pSupInfo->numOfPks = 0;
|
||||
pSupInfo->pkSrcSlot = -1;
|
||||
pSupInfo->pkDstSlot = -1;
|
||||
|
||||
pSupInfo->smaValid = true;
|
||||
pSupInfo->numOfCols = numOfCols;
|
||||
|
||||
pSupInfo->colId = taosMemoryMalloc(numOfCols * (sizeof(int16_t) * 2 + POINTER_BYTES));
|
||||
if (pSupInfo->colId == NULL) {
|
||||
taosMemoryFree(pSupInfo->colId);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
return terrno;
|
||||
}
|
||||
|
||||
pSupInfo->slotId = (int16_t*)((char*)pSupInfo->colId + (sizeof(int16_t) * numOfCols));
|
||||
|
@ -187,7 +189,7 @@ static int32_t setColumnIdSlotList(SBlockLoadSuppInfo* pSupInfo, SColumnInfo* pC
|
|||
pSupInfo->buildBuf[i] = taosMemoryMalloc(pCols[i].bytes);
|
||||
if (pSupInfo->buildBuf[i] == NULL) {
|
||||
tsdbError("failed to prepare memory for set columnId slot list, size:%d, code:out of memory", pCols[i].bytes);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
initSucc = false;
|
||||
}
|
||||
} else {
|
||||
pSupInfo->buildBuf[i] = NULL;
|
||||
|
@ -201,7 +203,7 @@ static int32_t setColumnIdSlotList(SBlockLoadSuppInfo* pSupInfo, SColumnInfo* pC
|
|||
}
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
return (initSucc)? TSDB_CODE_SUCCESS:TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
static int32_t updateBlockSMAInfo(STSchema* pSchema, SBlockLoadSuppInfo* pSupInfo) {
|
||||
|
|
|
@ -943,7 +943,7 @@ int32_t loadMemTombData(SArray** ppMemDelData, STbData* pMemTbData, STbData* piM
|
|||
if (*ppMemDelData == NULL) {
|
||||
*ppMemDelData = taosArrayInit(4, sizeof(SDelData));
|
||||
if (*ppMemDelData == NULL) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
return terrno;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -957,7 +957,7 @@ int32_t loadMemTombData(SArray** ppMemDelData, STbData* pMemTbData, STbData* piM
|
|||
if (p->version <= ver) {
|
||||
void* px = taosArrayPush(pMemDelData, p);
|
||||
if (px == NULL) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
return terrno;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -972,7 +972,7 @@ int32_t loadMemTombData(SArray** ppMemDelData, STbData* pMemTbData, STbData* piM
|
|||
if (p->version <= ver) {
|
||||
void* px = taosArrayPush(pMemDelData, p);
|
||||
if (px == NULL) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
return terrno;
|
||||
}
|
||||
}
|
||||
p = p->pNext;
|
||||
|
|
|
@ -910,6 +910,7 @@ void streamBackendCleanup(void* arg) {
|
|||
|
||||
if (pHandle->db) {
|
||||
rocksdb_close(pHandle->db);
|
||||
pHandle->db = NULL;
|
||||
}
|
||||
rocksdb_options_destroy(pHandle->dbOpt);
|
||||
rocksdb_env_destroy(pHandle->env);
|
||||
|
@ -2508,6 +2509,7 @@ STaskDbWrapper* taskDbOpenImpl(const char* key, char* statePath, char* dbPath) {
|
|||
}
|
||||
|
||||
rocksdb_close(pTaskDb->db);
|
||||
pTaskDb->db = NULL;
|
||||
|
||||
if (cfNames != NULL) {
|
||||
rocksdb_list_column_families_destroy(cfNames, nCf);
|
||||
|
@ -2617,6 +2619,7 @@ void taskDbDestroy(void* pDb, bool flush) {
|
|||
|
||||
if (wrapper->db) {
|
||||
rocksdb_close(wrapper->db);
|
||||
wrapper->db = NULL;
|
||||
}
|
||||
|
||||
rocksdb_options_destroy(wrapper->dbOpt);
|
||||
|
|
|
@ -133,6 +133,9 @@ void streamTaskSendCheckMsg(SStreamTask* pTask) {
|
|||
|
||||
for (int32_t i = 0; i < numOfVgs; i++) {
|
||||
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
|
||||
if (pVgInfo == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
setCheckDownstreamReqInfo(&req, tGenIdPI64(), pVgInfo->taskId, pVgInfo->vgId);
|
||||
streamTaskAddReqInfo(&pTask->taskCheckInfo, req.reqId, pVgInfo->taskId, pVgInfo->vgId, idstr);
|
||||
|
@ -370,6 +373,10 @@ void addIntoNodeUpdateList(SStreamTask* pTask, int32_t nodeId) {
|
|||
bool existed = false;
|
||||
for (int i = 0; i < num; ++i) {
|
||||
SDownstreamTaskEpset* p = taosArrayGet(pTask->outputInfo.pNodeEpsetUpdateList, i);
|
||||
if (p == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (p->nodeId == nodeId) {
|
||||
existed = true;
|
||||
break;
|
||||
|
@ -412,6 +419,10 @@ void findCheckRspStatus(STaskCheckInfo* pInfo, int32_t taskId, SDownstreamStatus
|
|||
*pStatusInfo = NULL;
|
||||
for (int32_t j = 0; j < taosArrayGetSize(pInfo->pList); ++j) {
|
||||
SDownstreamStatusInfo* p = taosArrayGet(pInfo->pList, j);
|
||||
if (p == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (p->taskId == taskId) {
|
||||
*pStatusInfo = p;
|
||||
}
|
||||
|
@ -546,6 +557,9 @@ void doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p) {
|
|||
|
||||
for (int32_t i = 0; i < numOfVgs; i++) {
|
||||
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
|
||||
if (pVgInfo == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (p->taskId == pVgInfo->taskId) {
|
||||
setCheckDownstreamReqInfo(&req, p->reqId, pVgInfo->taskId, pVgInfo->vgId);
|
||||
|
@ -566,6 +580,10 @@ void getCheckRspStatus(STaskCheckInfo* pInfo, int64_t el, int32_t* numOfReady, i
|
|||
int32_t* numOfNotRsp, SArray* pTimeoutList, SArray* pNotReadyList, const char* id) {
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pList); ++i) {
|
||||
SDownstreamStatusInfo* p = taosArrayGet(pInfo->pList, i);
|
||||
if (p == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (p->status == TASK_DOWNSTREAM_READY) {
|
||||
(*numOfReady) += 1;
|
||||
} else if (p->status == TASK_UPSTREAM_NEW_STAGE || p->status == TASK_DOWNSTREAM_NOT_LEADER) {
|
||||
|
@ -603,8 +621,12 @@ void handleTimeoutDownstreamTasks(SStreamTask* pTask, SArray* pTimeoutList) {
|
|||
pInfo->timeoutStartTs = taosGetTimestampMs();
|
||||
|
||||
for (int32_t i = 0; i < numOfTimeout; ++i) {
|
||||
int32_t taskId = *(int32_t*)taosArrayGet(pTimeoutList, i);
|
||||
int32_t* px = taosArrayGet(pTimeoutList, i);
|
||||
if (px == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
int32_t taskId = *px;
|
||||
SDownstreamStatusInfo* p = NULL;
|
||||
findCheckRspStatus(pInfo, taskId, &p);
|
||||
if (p != NULL) {
|
||||
|
@ -620,9 +642,13 @@ void handleTimeoutDownstreamTasks(SStreamTask* pTask, SArray* pTimeoutList) {
|
|||
pInfo->timeoutRetryCount = 0;
|
||||
|
||||
for (int32_t i = 0; i < numOfTimeout; ++i) {
|
||||
int32_t taskId = *(int32_t*)taosArrayGet(pTimeoutList, i);
|
||||
int32_t* pTaskId = taosArrayGet(pTimeoutList, i);
|
||||
if (pTaskId == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
SDownstreamStatusInfo* p = NULL;
|
||||
findCheckRspStatus(pInfo, taskId, &p);
|
||||
findCheckRspStatus(pInfo, *pTaskId, &p);
|
||||
if (p != NULL) {
|
||||
addIntoNodeUpdateList(pTask, p->vgId);
|
||||
stDebug("s-task:%s vgId:%d downstream task:0x%x (vgId:%d) timeout more than 100sec, add into nodeUpate list",
|
||||
|
@ -647,10 +673,13 @@ void handleNotReadyDownstreamTask(SStreamTask* pTask, SArray* pNotReadyList) {
|
|||
|
||||
// reset the info, and send the check msg to failure downstream again
|
||||
for (int32_t i = 0; i < numOfNotReady; ++i) {
|
||||
int32_t taskId = *(int32_t*)taosArrayGet(pNotReadyList, i);
|
||||
int32_t* pTaskId = taosArrayGet(pNotReadyList, i);
|
||||
if (pTaskId == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
SDownstreamStatusInfo* p = NULL;
|
||||
findCheckRspStatus(pInfo, taskId, &p);
|
||||
findCheckRspStatus(pInfo, *pTaskId, &p);
|
||||
if (p != NULL) {
|
||||
p->rspTs = 0;
|
||||
p->status = -1;
|
||||
|
|
|
@ -36,8 +36,8 @@ int32_t createStreamBlockFromDispatchMsg(const SStreamDispatchReq* pReq, int32_t
|
|||
ASSERT((pReq->blockNum == taosArrayGetSize(pReq->data)) && (pReq->blockNum == taosArrayGetSize(pReq->dataLen)));
|
||||
for (int32_t i = 0; i < blockNum; i++) {
|
||||
SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)taosArrayGetP(pReq->data, i);
|
||||
SSDataBlock* pDataBlock = taosArrayGet(pArray, i);
|
||||
if (pDataBlock == NULL) {
|
||||
SSDataBlock* pDataBlock = taosArrayGet(pArray, i);
|
||||
if (pDataBlock == NULL || pRetrieve == NULL) {
|
||||
return terrno;
|
||||
}
|
||||
|
||||
|
|
|
@ -268,6 +268,10 @@ static SStreamDispatchReq* createDispatchDataReq(SStreamTask* pTask, const SStre
|
|||
|
||||
for (int32_t i = 0; i < numOfVgroups; i++) {
|
||||
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
|
||||
if (pVgInfo == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
code = tInitStreamDispatchReq(&pReqs[i], pTask, pData->srcVgId, 0, pVgInfo->taskId, pData->type);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
destroyDispatchMsg(pReqs, numOfVgroups);
|
||||
|
@ -300,6 +304,10 @@ static int32_t doBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* pD
|
|||
|
||||
if (pData->type == STREAM_INPUT__CHECKPOINT_TRIGGER) {
|
||||
SSDataBlock* p = taosArrayGet(pData->blocks, 0);
|
||||
if (p == NULL) {
|
||||
return terrno;
|
||||
}
|
||||
|
||||
pTask->msgInfo.checkpointId = p->info.version;
|
||||
pTask->msgInfo.transId = p->info.window.ekey;
|
||||
}
|
||||
|
@ -313,6 +321,11 @@ static int32_t doBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* pD
|
|||
if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
|
||||
for (int32_t i = 0; i < numOfBlocks; i++) {
|
||||
SSDataBlock* pDataBlock = taosArrayGet(pData->blocks, i);
|
||||
if (pDataBlock == NULL) {
|
||||
destroyDispatchMsg(pReqs, 1);
|
||||
return terrno;
|
||||
}
|
||||
|
||||
code = streamAddBlockIntoDispatchMsg(pDataBlock, pReqs);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
destroyDispatchMsg(pReqs, 1);
|
||||
|
@ -328,6 +341,10 @@ static int32_t doBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* pD
|
|||
|
||||
for (int32_t i = 0; i < numOfBlocks; i++) {
|
||||
SSDataBlock* pDataBlock = taosArrayGet(pData->blocks, i);
|
||||
if (pDataBlock == NULL) {
|
||||
destroyDispatchMsg(pReqs, numOfVgroups);
|
||||
return terrno;
|
||||
}
|
||||
|
||||
// TODO: do not use broadcast
|
||||
if (pDataBlock->info.type == STREAM_DELETE_RESULT || pDataBlock->info.type == STREAM_CHECKPOINT ||
|
||||
|
@ -342,6 +359,10 @@ static int32_t doBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* pD
|
|||
// it's a new vnode to receive dispatch msg, so add one
|
||||
if (pReqs[j].blockNum == 0) {
|
||||
SVgroupInfo* pDstVgroupInfo = taosArrayGet(vgInfo, j);
|
||||
if (pDstVgroupInfo == NULL) {
|
||||
destroyDispatchMsg(pReqs, numOfVgroups);
|
||||
return terrno;
|
||||
}
|
||||
addDispatchEntry(&pTask->msgInfo, pDstVgroupInfo->vgId, now, true);
|
||||
}
|
||||
|
||||
|
@ -399,6 +420,11 @@ static int32_t sendDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pDispatch
|
|||
for (int32_t i = 0; i < numOfVgroups; i++) {
|
||||
if (pDispatchMsg[i].blockNum > 0) {
|
||||
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
|
||||
if (pVgInfo == NULL) {
|
||||
code = terrno;
|
||||
break;
|
||||
}
|
||||
|
||||
stDebug("s-task:%s (child taskId:%d) shuffle-dispatch blocks:%d to vgId:%d", id, pTask->info.selfChildId,
|
||||
pDispatchMsg[i].blockNum, pVgInfo->vgId);
|
||||
|
||||
|
@ -457,6 +483,10 @@ static void doSendFailedDispatch(SStreamTask* pTask, SDispatchEntry* pEntry, int
|
|||
setResendInfo(pEntry, now);
|
||||
for (int32_t j = 0; j < numOfVgroups; ++j) {
|
||||
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, j);
|
||||
if (pVgInfo == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (pVgInfo->vgId == pEntry->nodeId) {
|
||||
int32_t code = doSendDispatchMsg(pTask, &pReq[j], pVgInfo->vgId, &pVgInfo->epSet);
|
||||
stDebug("s-task:%s (child taskId:%d) shuffle-dispatch blocks:%d to vgId:%d for %s, msgId:%d, code:%s",
|
||||
|
@ -521,6 +551,10 @@ static void doMonitorDispatchData(void* param, void* tmrId) {
|
|||
int32_t numOfRetry = 0;
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pTask->msgInfo.pSendInfo); ++i) {
|
||||
SDispatchEntry* pEntry = taosArrayGet(pTask->msgInfo.pSendInfo, i);
|
||||
if (pEntry == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (pEntry->status == TSDB_CODE_SUCCESS && pEntry->rspTs > 0) {
|
||||
continue;
|
||||
}
|
||||
|
@ -553,14 +587,17 @@ static void doMonitorDispatchData(void* param, void* tmrId) {
|
|||
SEpSet* pEpSet = &pTask->outputInfo.fixedDispatcher.epSet;
|
||||
int32_t downstreamTaskId = pTask->outputInfo.fixedDispatcher.taskId;
|
||||
|
||||
ASSERT(taosArrayGetSize(pTask->msgInfo.pSendInfo) == 1);
|
||||
int32_t s = taosArrayGetSize(pTask->msgInfo.pSendInfo);
|
||||
SDispatchEntry* pEntry = taosArrayGet(pTask->msgInfo.pSendInfo, 0);
|
||||
if (pEntry != NULL) {
|
||||
setResendInfo(pEntry, now);
|
||||
code = doSendDispatchMsg(pTask, pReq, dstVgId, pEpSet);
|
||||
|
||||
setResendInfo(pEntry, now);
|
||||
code = doSendDispatchMsg(pTask, pReq, dstVgId, pEpSet);
|
||||
|
||||
stDebug("s-task:%s (child taskId:%d) fix-dispatch %d block(s) to s-task:0x%x (vgId:%d), msgId:%d, code:%s", id,
|
||||
pTask->info.selfChildId, 1, downstreamTaskId, dstVgId, msgId, tstrerror(code));
|
||||
stDebug("s-task:%s (child taskId:%d) fix-dispatch %d block(s) to s-task:0x%x (vgId:%d), msgId:%d, code:%s", id,
|
||||
pTask->info.selfChildId, 1, downstreamTaskId, dstVgId, msgId, tstrerror(code));
|
||||
} else {
|
||||
stError("s-task:%s invalid index 0, size:%d", id, s);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -637,6 +674,9 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S
|
|||
|
||||
for (int32_t j = 0; j < numOfVgroups; j++) {
|
||||
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, j);
|
||||
if (pVgInfo == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (hashValue >= pVgInfo->hashBegin && hashValue <= pVgInfo->hashEnd) {
|
||||
if (streamAddBlockIntoDispatchMsg(pDataBlock, &pReqs[j]) < 0) {
|
||||
|
@ -646,7 +686,9 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S
|
|||
|
||||
if (pReqs[j].blockNum == 0) {
|
||||
SVgroupInfo* pDstVgroupInfo = taosArrayGet(vgInfo, j);
|
||||
addDispatchEntry(&pTask->msgInfo, pDstVgroupInfo->vgId, now, false);
|
||||
if (pDstVgroupInfo != NULL) {
|
||||
addDispatchEntry(&pTask->msgInfo, pDstVgroupInfo->vgId, now, false);
|
||||
}
|
||||
}
|
||||
|
||||
pReqs[j].blockNum++;
|
||||
|
@ -832,6 +874,10 @@ static void checkpointReadyMsgSendMonitorFn(void* param, void* tmrId) {
|
|||
|
||||
for (int32_t i = 0; i < num; ++i) {
|
||||
STaskCheckpointReadyInfo* pInfo = taosArrayGet(pList, i);
|
||||
if (pInfo == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (pInfo->sendCompleted == 1) {
|
||||
continue;
|
||||
}
|
||||
|
@ -846,11 +892,18 @@ static void checkpointReadyMsgSendMonitorFn(void* param, void* tmrId) {
|
|||
int32_t notRsp = taosArrayGetSize(pNotRspList);
|
||||
if (notRsp > 0) { // send checkpoint-ready msg again
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pNotRspList); ++i) {
|
||||
int32_t taskId = *(int32_t*)taosArrayGet(pNotRspList, i);
|
||||
int32_t* pTaskId = taosArrayGet(pNotRspList, i);
|
||||
if (pTaskId == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
for (int32_t j = 0; j < num; ++j) {
|
||||
STaskCheckpointReadyInfo* pReadyInfo = taosArrayGet(pList, j);
|
||||
if (taskId == pReadyInfo->upstreamTaskId) { // send msg again
|
||||
if (pReadyInfo == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (*pTaskId == pReadyInfo->upstreamTaskId) { // send msg again
|
||||
|
||||
SRpcMsg msg = {0};
|
||||
int32_t code = initCheckpointReadyMsg(pTask, pReadyInfo->upstreamNodeId, pReadyInfo->upstreamTaskId, pReadyInfo->childId,
|
||||
|
@ -902,6 +955,9 @@ int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask) {
|
|||
|
||||
for (int32_t i = 0; i < num; ++i) {
|
||||
STaskCheckpointReadyInfo* pInfo = taosArrayGet(pList, i);
|
||||
if (pInfo == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
SRpcMsg msg = {0};
|
||||
int32_t code = initCheckpointReadyMsg(pTask, pInfo->upstreamNodeId, pInfo->upstreamTaskId, pInfo->childId,
|
||||
|
@ -945,11 +1001,14 @@ int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask) {
|
|||
streamMutexLock(&pTask->chkInfo.pActiveInfo->lock);
|
||||
if (taosArrayGetSize(pList) == 1) {
|
||||
STaskCheckpointReadyInfo* pInfo = taosArrayGet(pList, 0);
|
||||
tmsgSendRsp(&pInfo->msg);
|
||||
|
||||
taosArrayClear(pList);
|
||||
stDebug("s-task:%s level:%d checkpoint-source rsp completed msg sent to mnode", pTask->id.idStr,
|
||||
pTask->info.taskLevel);
|
||||
if (pInfo != NULL) {
|
||||
tmsgSendRsp(&pInfo->msg);
|
||||
taosArrayClear(pList);
|
||||
stDebug("s-task:%s level:%d checkpoint-source rsp completed msg sent to mnode", pTask->id.idStr,
|
||||
pTask->info.taskLevel);
|
||||
} else {
|
||||
// todo
|
||||
}
|
||||
} else {
|
||||
stDebug("s-task:%s level:%d already send checkpoint-source rsp success to mnode", pTask->id.idStr,
|
||||
pTask->info.taskLevel);
|
||||
|
@ -1097,6 +1156,10 @@ int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHa
|
|||
ASSERT(size == 1);
|
||||
|
||||
STaskCheckpointReadyInfo* pReady = taosArrayGet(pActiveInfo->pReadyMsgList, 0);
|
||||
if (pReady == NULL) {
|
||||
return terrno;
|
||||
}
|
||||
|
||||
if (pReady->transId == pReq->transId) {
|
||||
stWarn("s-task:%s repeatly recv checkpoint source msg from mnode, checkpointId:%" PRId64 ", ignore",
|
||||
pTask->id.idStr, pReq->checkpointId);
|
||||
|
@ -1104,7 +1167,6 @@ int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHa
|
|||
stError("s-task:%s checkpointId:%" PRId64 " transId:%d not completed, new transId:%d checkpointId:%" PRId64
|
||||
" recv from mnode",
|
||||
pTask->id.idStr, pReady->checkpointId, pReady->transId, pReq->transId, pReq->checkpointId);
|
||||
ASSERT(0); // failed to handle it
|
||||
}
|
||||
} else {
|
||||
(void) taosArrayPush(pActiveInfo->pReadyMsgList, &info);
|
||||
|
@ -1168,7 +1230,9 @@ void streamClearChkptReadyMsg(SActiveCheckpointInfo* pActiveInfo) {
|
|||
|
||||
for (int i = 0; i < taosArrayGetSize(pActiveInfo->pReadyMsgList); i++) {
|
||||
STaskCheckpointReadyInfo* pInfo = taosArrayGet(pActiveInfo->pReadyMsgList, i);
|
||||
rpcFreeCont(pInfo->msg.pCont);
|
||||
if (pInfo != NULL) {
|
||||
rpcFreeCont(pInfo->msg.pCont);
|
||||
}
|
||||
}
|
||||
|
||||
taosArrayClear(pActiveInfo->pReadyMsgList);
|
||||
|
@ -1215,6 +1279,10 @@ static bool setDispatchRspInfo(SDispatchMsgInfo* pMsgInfo, int32_t vgId, int32_t
|
|||
|
||||
for(int32_t i = 0; i < numOfDispatchBranch; ++i) {
|
||||
SDispatchEntry* pEntry = taosArrayGet(pMsgInfo->pSendInfo, i);
|
||||
if (pEntry == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (pEntry->rspTs != -1) {
|
||||
numOfRsp += 1;
|
||||
}
|
||||
|
@ -1222,6 +1290,10 @@ static bool setDispatchRspInfo(SDispatchMsgInfo* pMsgInfo, int32_t vgId, int32_t
|
|||
|
||||
for (int32_t j = 0; j < numOfDispatchBranch; ++j) {
|
||||
SDispatchEntry* pEntry = taosArrayGet(pMsgInfo->pSendInfo, j);
|
||||
if (pEntry == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (pEntry->nodeId == vgId) {
|
||||
ASSERT(!alreadySet);
|
||||
pEntry->rspTs = now;
|
||||
|
@ -1254,6 +1326,10 @@ int32_t getFailedDispatchInfo(SDispatchMsgInfo* pMsgInfo, int64_t now) {
|
|||
|
||||
for (int32_t j = 0; j < taosArrayGetSize(pMsgInfo->pSendInfo); ++j) {
|
||||
SDispatchEntry* pEntry = taosArrayGet(pMsgInfo->pSendInfo, j);
|
||||
if (pEntry == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (pEntry->status != TSDB_CODE_SUCCESS || isDispatchRspTimeout(pEntry, now)) {
|
||||
numOfFailed += 1;
|
||||
}
|
||||
|
|
|
@ -87,6 +87,10 @@ int32_t tEncodeStreamTaskUpdateMsg(SEncoder* pEncoder, const SStreamTaskNodeUpda
|
|||
|
||||
for (int32_t i = 0; i < size; ++i) {
|
||||
SNodeUpdateInfo* pInfo = taosArrayGet(pMsg->pNodeList, i);
|
||||
if (pInfo == NULL) {
|
||||
return terrno;
|
||||
}
|
||||
|
||||
if (tEncodeI32(pEncoder, pInfo->nodeId) < 0) return -1;
|
||||
if (tEncodeSEpSet(pEncoder, &pInfo->prevEp) < 0) return -1;
|
||||
if (tEncodeSEpSet(pEncoder, &pInfo->newEp) < 0) return -1;
|
||||
|
@ -228,10 +232,14 @@ int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* p
|
|||
ASSERT(taosArrayGetSize(pReq->data) == pReq->blockNum);
|
||||
ASSERT(taosArrayGetSize(pReq->dataLen) == pReq->blockNum);
|
||||
for (int32_t i = 0; i < pReq->blockNum; i++) {
|
||||
int32_t len = *(int32_t*)taosArrayGet(pReq->dataLen, i);
|
||||
void* data = taosArrayGetP(pReq->data, i);
|
||||
if (tEncodeI32(pEncoder, len) < 0) return -1;
|
||||
if (tEncodeBinary(pEncoder, data, len) < 0) return -1;
|
||||
int32_t* pLen = taosArrayGet(pReq->dataLen, i);
|
||||
void* data = taosArrayGetP(pReq->data, i);
|
||||
if (data == NULL || pLen == NULL) {
|
||||
return terrno;
|
||||
}
|
||||
|
||||
if (tEncodeI32(pEncoder, *pLen) < 0) return -1;
|
||||
if (tEncodeBinary(pEncoder, data, *pLen) < 0) return -1;
|
||||
}
|
||||
tEndEncode(pEncoder);
|
||||
return pEncoder->pos;
|
||||
|
@ -341,6 +349,10 @@ int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) {
|
|||
|
||||
for (int32_t i = 0; i < pReq->numOfTasks; ++i) {
|
||||
STaskStatusEntry* ps = taosArrayGet(pReq->pTaskStatus, i);
|
||||
if (ps == NULL) {
|
||||
return terrno;
|
||||
}
|
||||
|
||||
if (tEncodeI64(pEncoder, ps->id.streamId) < 0) return -1;
|
||||
if (tEncodeI32(pEncoder, ps->id.taskId) < 0) return -1;
|
||||
if (tEncodeI32(pEncoder, ps->status) < 0) return -1;
|
||||
|
@ -378,6 +390,10 @@ int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) {
|
|||
|
||||
for (int j = 0; j < numOfVgs; ++j) {
|
||||
int32_t* pVgId = taosArrayGet(pReq->pUpdateNodes, j);
|
||||
if (pVgId == NULL) {
|
||||
return terrno;
|
||||
}
|
||||
|
||||
if (tEncodeI32(pEncoder, *pVgId) < 0) return -1;
|
||||
}
|
||||
|
||||
|
|
|
@ -64,8 +64,12 @@ int32_t binarySearch(void* keyList, int num, const void* key, __session_compare_
|
|||
int64_t getSessionWindowEndkey(void* data, int32_t index) {
|
||||
SArray* pWinInfos = (SArray*)data;
|
||||
SRowBuffPos** ppos = taosArrayGet(pWinInfos, index);
|
||||
SSessionKey* pWin = (SSessionKey*)((*ppos)->pKey);
|
||||
return pWin->win.ekey;
|
||||
if (ppos != NULL) {
|
||||
SSessionKey* pWin = (SSessionKey*)((*ppos)->pKey);
|
||||
return pWin->win.ekey;
|
||||
} else {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
bool inSessionWindow(SSessionKey* pKey, TSKEY ts, int64_t gap) {
|
||||
|
|
|
@ -399,6 +399,10 @@ void snapFileDestroy(SBackendSnapFile2* pSnap) {
|
|||
// unite read/write snap file
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pSnap->pFileList); i++) {
|
||||
SBackendFileItem* pItem = taosArrayGet(pSnap->pFileList, i);
|
||||
if (pItem == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (pItem->ref == 0) {
|
||||
taosMemoryFree(pItem->name);
|
||||
}
|
||||
|
|
|
@ -555,6 +555,9 @@ void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SE
|
|||
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pVgs); i++) {
|
||||
SVgroupInfo* pVgInfo = taosArrayGet(pVgs, i);
|
||||
if (pVgInfo == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (pVgInfo->vgId == nodeId) {
|
||||
bool isEqual = isEpsetEqual(&pVgInfo->epSet, pEpSet);
|
||||
|
@ -636,6 +639,10 @@ bool streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList) {
|
|||
bool updated = false;
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pNodeList); ++i) {
|
||||
SNodeUpdateInfo* pInfo = taosArrayGet(pNodeList, i);
|
||||
if (pInfo == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
int32_t code = doUpdateTaskEpset(pTask, pInfo->nodeId, &pInfo->newEp, &updated);
|
||||
if (code) {
|
||||
stError("s-task:0x%x failed to update the task nodeEp epset, code:%s", pTask->id.taskId, tstrerror(code));
|
||||
|
@ -1013,6 +1020,10 @@ SEpSet* streamTaskGetDownstreamEpInfo(SStreamTask* pTask, int32_t taskId) {
|
|||
SArray* pList = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pList); ++i) {
|
||||
SVgroupInfo* pVgInfo = taosArrayGet(pList, i);
|
||||
if (pVgInfo == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (pVgInfo->taskId == taskId) {
|
||||
return &pVgInfo->epSet;
|
||||
}
|
||||
|
|
|
@ -164,6 +164,10 @@ static STaskStateTrans* streamTaskFindTransform(ETaskStatus state, const EStream
|
|||
int32_t numOfTrans = taosArrayGetSize(streamTaskSMTrans);
|
||||
for (int32_t i = 0; i < numOfTrans; ++i) {
|
||||
STaskStateTrans* pTrans = taosArrayGet(streamTaskSMTrans, i);
|
||||
if (pTrans == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (pTrans->state.state == state && pTrans->event == event) {
|
||||
return pTrans;
|
||||
}
|
||||
|
@ -187,6 +191,9 @@ static int32_t doHandleWaitingEvent(SStreamTaskSM* pSM, const char* pEventName,
|
|||
ASSERT(taosArrayGetSize(pSM->pWaitingEventList) == 1);
|
||||
|
||||
SFutureHandleEventInfo* pEvtInfo = taosArrayGet(pSM->pWaitingEventList, 0);
|
||||
if (pEvtInfo == NULL) {
|
||||
return terrno;
|
||||
}
|
||||
|
||||
// OK, let's handle the waiting event, since the task has reached the required status now
|
||||
if (pSM->current.state == pEvtInfo->status) {
|
||||
|
@ -227,6 +234,10 @@ static int32_t removeEventInWaitingList(SStreamTask* pTask, EStreamTaskEvent eve
|
|||
int32_t num = taosArrayGetSize(pSM->pWaitingEventList);
|
||||
for (int32_t i = 0; i < num; ++i) {
|
||||
SFutureHandleEventInfo* pInfo = taosArrayGet(pSM->pWaitingEventList, i);
|
||||
if (pInfo == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (pInfo->event == event) {
|
||||
taosArrayRemove(pSM->pWaitingEventList, i);
|
||||
stDebug("s-task:%s %s event in waiting list not be handled yet, remove it from waiting list, remaining events:%d",
|
||||
|
|
Loading…
Reference in New Issue