fix(stream): check return value.

This commit is contained in:
Haojun Liao 2024-07-22 09:13:39 +08:00
parent 651077866e
commit 7201526674
16 changed files with 882 additions and 436 deletions

View File

@ -686,7 +686,7 @@ int32_t streamExecScanHistoryInFuture(SStreamTask* pTask, int32_t idleDuration);
bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer);
// checkpoint related
int32_t streamTaskGetActiveCheckpointInfo(const SStreamTask* pTask, int32_t* pTransId, int64_t* pCheckpointId);
void streamTaskGetActiveCheckpointInfo(const SStreamTask* pTask, int32_t* pTransId, int64_t* pCheckpointId);
int32_t streamTaskSetActiveCheckpointInfo(SStreamTask* pTask, int64_t activeCheckpointId);
int32_t streamTaskSetFailedChkptInfo(SStreamTask* pTask, int32_t transId, int64_t checkpointId);
bool streamTaskAlreadySendTrigger(SStreamTask* pTask, int32_t downstreamNodeId);
@ -770,9 +770,9 @@ bool streamMetaAllTasksReady(const SStreamMeta* pMeta);
int32_t streamTaskSendRestoreChkptMsg(SStreamTask* pTask);
// timer
tmr_h streamTimerGetInstance();
void streamTmrReset(TAOS_TMR_CALLBACK fp, int32_t mseconds, void* param, void* handle, tmr_h* pTmrId, int32_t vgId,
const char* pMsg);
int32_t streamTimerGetInstance(tmr_h* pTmr);
void streamTmrReset(TAOS_TMR_CALLBACK fp, int32_t mseconds, void* param, void* handle, tmr_h* pTmrId, int32_t vgId,
const char* pMsg);
// checkpoint
int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSourceReq* pReq);
@ -809,6 +809,9 @@ void streamTaskSendRetrieveRsp(SStreamRetrieveReq* pReq, SRpcMsg* pRsp);
int32_t streamProcessHeartbeatRsp(SStreamMeta* pMeta, SMStreamHbRspMsg* pRsp);
int32_t streamTaskSendCheckpointsourceRsp(SStreamTask* pTask);
void streamMutexLock(TdThreadMutex *pMutex);
void streamMutexUnlock(TdThreadMutex *pMutex);
void streamMutexDestroy(TdThreadMutex *pMutex);
#ifdef __cplusplus
}

View File

@ -2491,7 +2491,7 @@ int32_t buildCtbNameByGroupIdImpl(const char* stbFullName, uint64_t groupId, cha
.tags = tags, .stbFullName = stbFullName, .stbFullNameLen = strlen(stbFullName), .ctbShortName = cname};
int32_t code = buildChildTableName(&rname);
if(code != TSDB_CODE_SUCCESS){
if (code != TSDB_CODE_SUCCESS) {
return code;
}
taosArrayDestroy(tags);

View File

@ -150,7 +150,6 @@ int32_t epsetToStr(const SEpSet* pEpSet, char* pBuf, int32_t cap) {
cap -= nwrite;
for (int _i = 0; (_i < pEpSet->numOfEps) && (cap > 0); _i++) {
int32_t ret = 0;
if (_i == pEpSet->numOfEps - 1) {
ret = snprintf(pBuf + nwrite, cap, "%d. %s:%d", _i, pEpSet->eps[_i].fqdn, pEpSet->eps[_i].port);
} else {

View File

@ -155,8 +155,8 @@ int32_t tqSetDstTableDataPayload(uint64_t suid, const STSchema* pTSchema, int32_
SSubmitTbData* pTableData, int64_t earlyTs, const char* id);
int32_t doMergeExistedRows(SSubmitTbData* pExisted, const SSubmitTbData* pNew, const char* id);
SVCreateTbReq* buildAutoCreateTableReq(const char* stbFullName, int64_t suid, int32_t numOfCols,
SSDataBlock* pDataBlock, SArray* pTagArray, bool newSubTableRule);
int32_t buildAutoCreateTableReq(const char* stbFullName, int64_t suid, int32_t numOfCols, SSDataBlock* pDataBlock,
SArray* pTagArray, bool newSubTableRule, SVCreateTbReq** pReq);
#define TQ_ERR_GO_TO_END(c) \
do { \

View File

@ -196,7 +196,12 @@ int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema *
SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version, .flags = SUBMIT_REQ_AUTO_CREATE_TABLE};
int32_t cid = taosArrayGetSize(pDataBlock->pDataBlock) + 1;
tbData.pCreateTbReq = buildAutoCreateTableReq(stbFullName, suid, cid, pDataBlock, tagArray, true);
code = buildAutoCreateTableReq(stbFullName, suid, cid, pDataBlock, tagArray, true, &tbData.pCreateTbReq);
if (code) {
smaError("failed to build create-table req, code:%d", code);
continue;
}
{
uint64_t groupId = pDataBlock->info.id.groupId;

View File

@ -18,8 +18,6 @@
#include "tmsg.h"
#include "tq.h"
#define MAX_CACHE_TABLE_INFO_NUM 10240
typedef struct STableSinkInfo {
uint64_t uid;
tstr name;
@ -43,7 +41,7 @@ static int32_t doRemoveFromCache(SSHashObj* pSinkTableMap, uint64_t groupId, con
static bool isValidDstChildTable(SMetaReader* pReader, int32_t vgId, const char* ctbName, int64_t suid);
static int32_t initCreateTableMsg(SVCreateTbReq* pCreateTableReq, uint64_t suid, const char* stbFullName,
int32_t numOfTags);
static SArray* createDefaultTagColName();
static int32_t createDefaultTagColName(SArray** pList);
static void setCreateTableMsgTableName(SVCreateTbReq* pCreateTableReq, SSDataBlock* pDataBlock, const char* stbFullName,
int64_t gid, bool newSubTableRule);
static int32_t doCreateSinkInfo(const char* pDstTableName, STableSinkInfo** pInfo);
@ -95,7 +93,10 @@ int32_t tqBuildDeleteReq(STQ* pTq, const char* stbFullName, const SSDataBlock* p
SSingleDeleteReq req = {.startTs = skey, .endTs = ekey};
strncpy(req.tbname, name, TSDB_TABLE_NAME_LEN - 1);
taosArrayPush(deleteReq->deleteReqs, &req);
void* p = taosArrayPush(deleteReq->deleteReqs, &req);
if (p == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}
if (originName) name = originName;
taosMemoryFreeClear(name);
@ -149,14 +150,20 @@ static bool tqGetTableInfo(SSHashObj* pTableInfoMap, uint64_t groupId, STableSin
static int32_t tqPutReqToQueue(SVnode* pVnode, SVCreateTbBatchReq* pReqs) {
void* buf = NULL;
int32_t tlen = 0;
encodeCreateChildTableForRPC(pReqs, TD_VID(pVnode), &buf, &tlen);
int32_t code = encodeCreateChildTableForRPC(pReqs, TD_VID(pVnode), &buf, &tlen);
if (code) {
tqError("vgId:%d failed to encode create table msg, create table failed, code:%s", TD_VID(pVnode), tstrerror(code));
return code;
}
SRpcMsg msg = {.msgType = TDMT_VND_CREATE_TABLE, .pCont = buf, .contLen = tlen};
if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) {
code = tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg);
if (code) {
tqError("failed to put into write-queue since %s", terrstr());
}
return TSDB_CODE_SUCCESS;
return code;
}
int32_t initCreateTableMsg(SVCreateTbReq* pCreateTableReq, uint64_t suid, const char* stbFullName, int32_t numOfTags) {
@ -166,18 +173,36 @@ int32_t initCreateTableMsg(SVCreateTbReq* pCreateTableReq, uint64_t suid, const
// set super table name
SName name = {0};
tNameFromString(&name, stbFullName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
pCreateTableReq->ctb.stbName = taosStrdup((char*)tNameGetTableName(&name));
int32_t code = tNameFromString(&name, stbFullName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
if (code == 0) {
pCreateTableReq->ctb.stbName = taosStrdup((char*)tNameGetTableName(&name));
if (pCreateTableReq->ctb.stbName == NULL) { // ignore this error code
tqError("failed to duplicate the stb name:%s, failed to init create-table msg and create req table", stbFullName);
}
}
pCreateTableReq->ctb.tagNum = numOfTags;
return TSDB_CODE_SUCCESS;
return code;
}
SArray* createDefaultTagColName() {
int32_t createDefaultTagColName(SArray** pColNameList) {
*pColNameList = NULL;
SArray* pTagColNameList = taosArrayInit(1, TSDB_COL_NAME_LEN);
char tagNameStr[TSDB_COL_NAME_LEN] = "group_id";
taosArrayPush(pTagColNameList, tagNameStr);
return pTagColNameList;
if (pTagColNameList == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
char tagNameStr[TSDB_COL_NAME_LEN] = "group_id";
void* p = taosArrayPush(pTagColNameList, tagNameStr);
if (p == NULL) {
taosArrayDestroy(pTagColNameList);
return TSDB_CODE_OUT_OF_MEMORY;
}
*pColNameList = pTagColNameList;
return TSDB_CODE_SUCCESS;
}
void setCreateTableMsgTableName(SVCreateTbReq* pCreateTableReq, SSDataBlock* pDataBlock, const char* stbFullName,
@ -201,18 +226,20 @@ void setCreateTableMsgTableName(SVCreateTbReq* pCreateTableReq, SSDataBlock* pDa
static int32_t doBuildAndSendCreateTableMsg(SVnode* pVnode, char* stbFullName, SSDataBlock* pDataBlock,
SStreamTask* pTask, int64_t suid) {
STSchema* pTSchema = pTask->outputInfo.tbSink.pTSchema;
int32_t rows = pDataBlock->info.rows;
SArray* tagArray = taosArrayInit(4, sizeof(STagVal));
STSchema* pTSchema = pTask->outputInfo.tbSink.pTSchema;
int32_t rows = pDataBlock->info.rows;
SArray* tagArray = taosArrayInit(4, sizeof(STagVal));
const char* id = pTask->id.idStr;
int32_t vgId = pTask->pMeta->vgId;
tqDebug("s-task:%s build create %d table(s) msg", pTask->id.idStr, rows);
tqDebug("s-task:%s build create %d table(s) msg", id, rows);
int32_t code = 0;
SVCreateTbBatchReq reqs = {0};
SArray* crTblArray = reqs.pArray = taosArrayInit(1, sizeof(SVCreateTbReq));
if (NULL == reqs.pArray) {
tqError("s-task:%s failed to init create table msg, code:%s", pTask->id.idStr, tstrerror(terrno));
tqError("s-task:%s failed to init create table msg, code:%s", id, tstrerror(terrno));
goto _end;
}
@ -222,15 +249,26 @@ static int32_t doBuildAndSendCreateTableMsg(SVnode* pVnode, char* stbFullName, S
int32_t size = taosArrayGetSize(pDataBlock->pDataBlock);
int32_t numOfTags = TMAX(size - UD_TAG_COLUMN_INDEX, 1);
initCreateTableMsg(pCreateTbReq, suid, stbFullName, numOfTags);
code = initCreateTableMsg(pCreateTbReq, suid, stbFullName, numOfTags);
if (code) {
tqError("s-task:%s vgId:%d failed to init create table msg", id, vgId);
continue;
}
taosArrayClear(tagArray);
if (size == 2) {
STagVal tagVal = {
.cid = pTSchema->numOfCols + 1, .type = TSDB_DATA_TYPE_UBIGINT, .i64 = pDataBlock->info.id.groupId};
taosArrayPush(tagArray, &tagVal);
pCreateTbReq->ctb.tagName = createDefaultTagColName();
void* p = taosArrayPush(tagArray, &tagVal);
if (p == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
code = createDefaultTagColName(&pCreateTbReq->ctb.tagName);
if (code) {
return code;
}
} else {
for (int32_t tagId = UD_TAG_COLUMN_INDEX, step = 1; tagId < size; tagId++, step++) {
SColumnInfoData* pTagData = taosArrayGet(pDataBlock->pDataBlock, tagId);
@ -245,14 +283,17 @@ static int32_t doBuildAndSendCreateTableMsg(SVnode* pVnode, char* stbFullName, S
} else {
memcpy(&tagVal.i64, pData, pTagData->info.bytes);
}
taosArrayPush(tagArray, &tagVal);
void* p = taosArrayPush(tagArray, &tagVal);
code = TSDB_CODE_OUT_OF_MEMORY;
goto _end;
}
}
tTagNew(tagArray, 1, false, (STag**)&pCreateTbReq->ctb.pTag);
code = tTagNew(tagArray, 1, false, (STag**)&pCreateTbReq->ctb.pTag);
taosArrayDestroy(tagArray);
tagArray = NULL;
if (pCreateTbReq->ctb.pTag == NULL) {
if (pCreateTbReq->ctb.pTag == NULL || (code != 0)) {
tdDestroySVCreateTbReq(pCreateTbReq);
code = TSDB_CODE_OUT_OF_MEMORY;
goto _end;
@ -270,22 +311,34 @@ static int32_t doBuildAndSendCreateTableMsg(SVnode* pVnode, char* stbFullName, S
setCreateTableMsgTableName(pCreateTbReq, pDataBlock, stbFullName, gid,
pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER && pTask->subtableWithoutMd5 != 1);
taosArrayPush(reqs.pArray, pCreateTbReq);
void* p = taosArrayPush(reqs.pArray, pCreateTbReq);
if (p == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _end;
}
STableSinkInfo* pInfo = NULL;
bool alreadyCached = tqGetTableInfo(pTask->outputInfo.tbSink.pTblInfo, gid, &pInfo);
if (!alreadyCached) {
code = doCreateSinkInfo(pCreateTbReq->name, &pInfo);
doPutIntoCache(pTask->outputInfo.tbSink.pTblInfo, pInfo, gid, pTask->id.idStr);
if (code) {
tqError("vgId:%d failed to create sink tableInfo for table:%s, s-task:%s", vgId, pCreateTbReq->name, id);
continue;
}
code = doPutIntoCache(pTask->outputInfo.tbSink.pTblInfo, pInfo, gid, id);
if (code) {
tqError("vgId:%d failed to put sink tableInfo:%s into cache, s-task:%s", vgId, pCreateTbReq->name, id);
}
}
tqDebug("s-task:%s build create table:%s msg complete", pTask->id.idStr, pCreateTbReq->name);
tqDebug("s-task:%s build create table:%s msg complete", id, pCreateTbReq->name);
}
reqs.nReqs = taosArrayGetSize(reqs.pArray);
code = tqPutReqToQueue(pVnode, &reqs);
if (code != TSDB_CODE_SUCCESS) {
tqError("s-task:%s failed to send create table msg", pTask->id.idStr);
tqError("s-task:%s failed to send create table msg", id);
}
_end:
@ -348,15 +401,26 @@ int32_t doMergeExistedRows(SSubmitTbData* pExisted, const SSubmitTbData* pNew, c
SRow* pOldRow = *(SRow**)TARRAY_GET_ELEM(pExisted->aRowP, k);
if (pNewRow->ts < pOldRow->ts) {
taosArrayPush(pFinal, &pNewRow);
void* p = taosArrayPush(pFinal, &pNewRow);
if (p == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
j += 1;
} else if (pNewRow->ts > pOldRow->ts) {
taosArrayPush(pFinal, &pOldRow);
void* p = taosArrayPush(pFinal, &pOldRow);
if (p == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
k += 1;
} else {
// check for the existance of primary key
if (pNewRow->numOfPKs == 0) {
taosArrayPush(pFinal, &pNewRow);
void* p = taosArrayPush(pFinal, &pNewRow);
if (p == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
k += 1;
j += 1;
tRowDestroy(pOldRow);
@ -369,7 +433,11 @@ int32_t doMergeExistedRows(SSubmitTbData* pExisted, const SSubmitTbData* pNew, c
int32_t ret = tRowKeyCompare(&kNew, &kOld);
if (ret <= 0) {
taosArrayPush(pFinal, &pNewRow);
void* p = taosArrayPush(pFinal, &pNewRow);
if (p == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
j += 1;
if (ret == 0) {
@ -377,7 +445,11 @@ int32_t doMergeExistedRows(SSubmitTbData* pExisted, const SSubmitTbData* pNew, c
tRowDestroy(pOldRow);
}
} else {
taosArrayPush(pFinal, &pOldRow);
void* p = taosArrayPush(pFinal, &pOldRow);
if (p == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
k += 1;
}
}
@ -386,12 +458,18 @@ int32_t doMergeExistedRows(SSubmitTbData* pExisted, const SSubmitTbData* pNew, c
while (j < newLen) {
SRow* pRow = *(SRow**)TARRAY_GET_ELEM(pNew->aRowP, j++);
taosArrayPush(pFinal, &pRow);
void* p = taosArrayPush(pFinal, &pRow);
if (p == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}
while (k < oldLen) {
SRow* pRow = *(SRow**)TARRAY_GET_ELEM(pExisted->aRowP, k++);
taosArrayPush(pFinal, &pRow);
void* p = taosArrayPush(pFinal, &pRow);
if (p == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}
taosArrayDestroy(pNew->aRowP);
@ -425,34 +503,40 @@ bool isValidDstChildTable(SMetaReader* pReader, int32_t vgId, const char* ctbNam
return true;
}
SVCreateTbReq* buildAutoCreateTableReq(const char* stbFullName, int64_t suid, int32_t numOfCols,
SSDataBlock* pDataBlock, SArray* pTagArray, bool newSubTableRule) {
int32_t buildAutoCreateTableReq(const char* stbFullName, int64_t suid, int32_t numOfCols, SSDataBlock* pDataBlock,
SArray* pTagArray, bool newSubTableRule, SVCreateTbReq** pReq) {
*pReq = NULL;
SVCreateTbReq* pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq));
if (pCreateTbReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
return TSDB_CODE_OUT_OF_MEMORY;
}
taosArrayClear(pTagArray);
initCreateTableMsg(pCreateTbReq, suid, stbFullName, 1);
STagVal tagVal = {.cid = numOfCols, .type = TSDB_DATA_TYPE_UBIGINT, .i64 = pDataBlock->info.id.groupId};
taosArrayPush(pTagArray, &tagVal);
tTagNew(pTagArray, 1, false, (STag**)&pCreateTbReq->ctb.pTag);
if (pCreateTbReq->ctb.pTag == NULL) {
tdDestroySVCreateTbReq(pCreateTbReq);
taosMemoryFreeClear(pCreateTbReq);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
int32_t code = initCreateTableMsg(pCreateTbReq, suid, stbFullName, 1);
if (code != 0) {
return code;
}
pCreateTbReq->ctb.tagName = createDefaultTagColName();
STagVal tagVal = {.cid = numOfCols, .type = TSDB_DATA_TYPE_UBIGINT, .i64 = pDataBlock->info.id.groupId};
void* p = taosArrayPush(pTagArray, &tagVal);
if (p == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
code = tTagNew(pTagArray, 1, false, (STag**)&pCreateTbReq->ctb.pTag);
if (pCreateTbReq->ctb.pTag == NULL || (code != 0)) {
tdDestroySVCreateTbReq(pCreateTbReq);
taosMemoryFreeClear(pCreateTbReq);
return code;
}
code = createDefaultTagColName(&pCreateTbReq->ctb.tagName);
// set table name
setCreateTableMsgTableName(pCreateTbReq, pDataBlock, stbFullName, pDataBlock->info.id.groupId, newSubTableRule);
return pCreateTbReq;
*pReq = pCreateTbReq;
return code;
}
int32_t buildSubmitMsgImpl(SSubmitReq2* pSubmitReq, int32_t vgId, void** pMsg, int32_t* msgLen) {
@ -555,7 +639,10 @@ int32_t doConvertRows(SSubmitTbData* pTableData, const STSchema* pTSchema, SSDat
break;
}
SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type);
taosArrayPush(pVals, &cv);
void* p = taosArrayPush(pVals, &cv);
if (p == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
} else {
SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, dataIndex);
if (colDataIsNull_s(pColData, j)) {
@ -566,7 +653,11 @@ int32_t doConvertRows(SSubmitTbData* pTableData, const STSchema* pTSchema, SSDat
}
SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type);
taosArrayPush(pVals, &cv);
void* p = taosArrayPush(pVals, &cv);
if (p == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
dataIndex++;
} else {
void* colData = colDataGetData(pColData, j);
@ -574,12 +665,18 @@ int32_t doConvertRows(SSubmitTbData* pTableData, const STSchema* pTSchema, SSDat
SValue sv =
(SValue){.type = pCol->type, .nData = varDataLen(colData), .pData = (uint8_t*)varDataVal(colData)};
SColVal cv = COL_VAL_VALUE(pCol->colId, sv);
taosArrayPush(pVals, &cv);
void* p = taosArrayPush(pVals, &cv);
if (p == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
} else {
SValue sv = {.type = pCol->type};
memcpy(&sv.val, colData, tDataTypes[pCol->type].bytes);
SColVal cv = COL_VAL_VALUE(pCol->colId, sv);
taosArrayPush(pVals, &cv);
void* p = taosArrayPush(pVals, &cv);
if (p == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}
dataIndex++;
}
@ -596,7 +693,10 @@ int32_t doConvertRows(SSubmitTbData* pTableData, const STSchema* pTSchema, SSDat
}
ASSERT(pRow);
taosArrayPush(pTableData->aRowP, &pRow);
void* p = taosArrayPush(pTableData->aRowP, &pRow);
if (p == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}
taosArrayDestroy(pVals);
@ -665,6 +765,7 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat
STSchema* pTSchema = pTask->outputInfo.tbSink.pTSchema;
int32_t vgId = TD_VID(pVnode);
STableSinkInfo* pTableSinkInfo = NULL;
int32_t code = 0;
bool alreadyCached = tqGetTableInfo(pTask->outputInfo.tbSink.pTblInfo, groupId, &pTableSinkInfo);
@ -686,7 +787,11 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat
} else { // this groupId has not been kept in cache yet
if (dstTableName[0] == 0) {
memset(dstTableName, 0, TSDB_TABLE_NAME_LEN);
buildCtbNameByGroupIdImpl(stbFullName, groupId, dstTableName);
code = buildCtbNameByGroupIdImpl(stbFullName, groupId, dstTableName);
if (code) {
tqDebug("s-task:%s failed to build auto create table-name:%s, groupId:0x%" PRId64, id, dstTableName, groupId);
return code;
}
} else {
if (pTask->subtableWithoutMd5 != 1 && !isAutoTableName(dstTableName) &&
!alreadyAddGroupId(dstTableName, groupId) && groupId != 0) {
@ -699,8 +804,13 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat
}
}
int32_t code = doCreateSinkInfo(dstTableName, &pTableSinkInfo);
tqDebug("s-task:%s build new sinkTableInfo to add cache, dstTable:%s", id, dstTableName);
code = doCreateSinkInfo(dstTableName, &pTableSinkInfo);
if (code == 0) {
tqDebug("s-task:%s build new sinkTableInfo to add cache, dstTable:%s", id, dstTableName);
} else {
tqDebug("s-task:%s failed to build new sinkTableInfo, dstTable:%s", id, dstTableName);
return code;
}
}
if (alreadyCached) {
@ -731,20 +841,20 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat
SArray* pTagArray = taosArrayInit(pTSchema->numOfCols + 1, sizeof(STagVal));
pTableData->flags = SUBMIT_REQ_AUTO_CREATE_TABLE;
pTableData->pCreateTbReq =
code =
buildAutoCreateTableReq(stbFullName, suid, pTSchema->numOfCols + 1, pDataBlock, pTagArray,
pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER && pTask->subtableWithoutMd5 != 1);
(pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER && pTask->subtableWithoutMd5 != 1),
&pTableData->pCreateTbReq);
taosArrayDestroy(pTagArray);
if (pTableData->pCreateTbReq == NULL) {
tqError("s-task:%s failed to build auto create dst-table req:%s, code:%s", id, dstTableName,
tstrerror(terrno));
if (code) {
tqError("s-task:%s failed to build auto create dst-table req:%s, code:%s", id, dstTableName, tstrerror(code));
taosMemoryFree(pTableSinkInfo);
return terrno;
return code;
}
pTableSinkInfo->uid = 0;
doPutIntoCache(pTask->outputInfo.tbSink.pTblInfo, pTableSinkInfo, groupId, id);
code = doPutIntoCache(pTask->outputInfo.tbSink.pTblInfo, pTableSinkInfo, groupId, id);
} else {
metaReaderClear(&mr);
@ -765,12 +875,12 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat
pTableSinkInfo->uid = mr.me.uid;
metaReaderClear(&mr);
doPutIntoCache(pTask->outputInfo.tbSink.pTblInfo, pTableSinkInfo, groupId, id);
code = doPutIntoCache(pTask->outputInfo.tbSink.pTblInfo, pTableSinkInfo, groupId, id);
}
}
}
return TDB_CODE_SUCCESS;
return code;
}
int32_t tqSetDstTableDataPayload(uint64_t suid, const STSchema *pTSchema, int32_t blockIndex, SSDataBlock* pDataBlock,
@ -864,14 +974,21 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) {
if (code != TSDB_CODE_SUCCESS || tbData.aRowP == NULL) {
if (tbData.pCreateTbReq != NULL) {
tdDestroySVCreateTbReq(tbData.pCreateTbReq);
doRemoveFromCache(pTask->outputInfo.tbSink.pTblInfo, pDataBlock->info.id.groupId, id);
(void) doRemoveFromCache(pTask->outputInfo.tbSink.pTblInfo, pDataBlock->info.id.groupId, id);
tbData.pCreateTbReq = NULL;
}
continue;
}
taosArrayPush(submitReq.aSubmitTbData, &tbData);
void* p = taosArrayPush(submitReq.aSubmitTbData, &tbData);
if (p == NULL) {
tqDebug("vgId:%d, s-task:%s failed to build submit msg, data lost", vgId, id);
}
code = doBuildAndSendSubmitMsg(pVnode, pTask, &submitReq, 1);
if (code) { // failed and continue
tqDebug("vgId:%d, s-task:%s submit msg failed, data lost", vgId, id);
}
}
}
} else {
@ -918,16 +1035,24 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) {
if (code != TSDB_CODE_SUCCESS || tbData.aRowP == NULL) {
if (tbData.pCreateTbReq != NULL) {
tdDestroySVCreateTbReq(tbData.pCreateTbReq);
doRemoveFromCache(pTask->outputInfo.tbSink.pTblInfo, groupId, id);
(void) doRemoveFromCache(pTask->outputInfo.tbSink.pTblInfo, groupId, id);
tbData.pCreateTbReq = NULL;
}
continue;
}
taosArrayPush(submitReq.aSubmitTbData, &tbData);
void* p = taosArrayPush(submitReq.aSubmitTbData, &tbData);
if (p == NULL) {
tqError("vgId:%d, s-task:%s failed to build submit msg, data lost", vgId, id);
continue;
}
int32_t size = (int32_t)taosArrayGetSize(submitReq.aSubmitTbData) - 1;
taosHashPut(pTableIndexMap, &groupId, sizeof(groupId), &size, sizeof(size));
code = taosHashPut(pTableIndexMap, &groupId, sizeof(groupId), &size, sizeof(size));
if (code) {
tqError("vgId:%d, s-task:%s failed to put group into index map, code:%s", vgId, id, tstrerror(code));
continue;
}
} else {
code = tqSetDstTableDataPayload(suid, pTSchema, i, pDataBlock, &tbData, earlyTs, id);
if (code != TSDB_CODE_SUCCESS || tbData.aRowP == NULL) {
@ -951,7 +1076,10 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) {
taosHashCleanup(pTableIndexMap);
if (hasSubmit) {
doBuildAndSendSubmitMsg(pVnode, pTask, &submitReq, numOfBlocks);
code = doBuildAndSendSubmitMsg(pVnode, pTask, &submitReq, numOfBlocks);
if (code) { // failed and continue
tqError("vgId:%d failed to build and send submit msg", vgId);
}
} else {
tDestroySubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE);
tqDebug("vgId:%d, s-task:%s write results completed", vgId, id);
@ -989,7 +1117,11 @@ int32_t doRemoveFromCache(SSHashObj* pSinkTableMap, uint64_t groupId, const char
}
int32_t code = tSimpleHashRemove(pSinkTableMap, &groupId, sizeof(groupId));
tqDebug("s-task:%s remove cached table meta for groupId:%" PRId64, id, groupId);
if (code == 0) {
tqDebug("s-task:%s remove cached table meta for groupId:%" PRId64, id, groupId);
} else {
tqError("s-task:%s failed to remove table meta from hashmap, groupId:%" PRId64, id, groupId);
}
return code;
}
@ -1019,10 +1151,14 @@ int32_t doBuildAndSendDeleteMsg(SVnode* pVnode, char* stbFullName, SSDataBlock*
void* serializedDeleteReq = rpcMallocCont(len + sizeof(SMsgHead));
void* abuf = POINTER_SHIFT(serializedDeleteReq, sizeof(SMsgHead));
tEncoderInit(&encoder, abuf, len);
tEncodeSBatchDeleteReq(&encoder, &deleteReq);
code = tEncodeSBatchDeleteReq(&encoder, &deleteReq);
tEncoderClear(&encoder);
taosArrayDestroy(deleteReq.deleteReqs);
if (code) {
return code;
}
((SMsgHead*)serializedDeleteReq)->vgId = TD_VID(pVnode);
SRpcMsg msg = {.msgType = TDMT_VND_BATCH_DEL, .pCont = serializedDeleteReq, .contLen = len + sizeof(SMsgHead)};

View File

@ -37,7 +37,12 @@ int32_t tqScanWal(STQ* pTq) {
// check all tasks
int32_t numOfTasks = 0;
bool shouldIdle = true;
doScanWalForAllTasks(pMeta, &shouldIdle);
int32_t code = doScanWalForAllTasks(pMeta, &shouldIdle);
if (code) {
tqError("vgId:%d failed to start all tasks, try next time", vgId);
return code;
}
streamMetaWLock(pMeta);
int32_t times = (--pMeta->scanInfo.scanCounter);
@ -51,9 +56,13 @@ int32_t tqScanWal(STQ* pTq) {
if (times > 0) {
tqDebug("vgId:%d scan wal for stream tasks for %d times in %dms", vgId, times, SCAN_WAL_IDLE_DURATION);
tqScanWalInFuture(pTq, numOfTasks, SCAN_WAL_IDLE_DURATION);
code = tqScanWalInFuture(pTq, numOfTasks, SCAN_WAL_IDLE_DURATION);
if (code) {
tqError("vgId:%d sched scan wal in %dms failed, ignore this failure", vgId, SCAN_WAL_IDLE_DURATION);
}
}
return 0;
return code;
}
typedef struct SBuildScanWalMsgParam {
@ -69,28 +78,44 @@ static void doStartScanWal(void* param, void* tmrId) {
tqDebug("vgId:%d create msg to start wal scan, numOfTasks:%d, vnd restored:%d", vgId, pParam->numOfTasks,
pTq->pVnode->restored);
/*int32_t code = */ streamTaskSchedTask(&pTq->pVnode->msgCb, vgId, 0, 0, STREAM_EXEC_T_EXTRACT_WAL_DATA);
int32_t code = streamTaskSchedTask(&pTq->pVnode->msgCb, vgId, 0, 0, STREAM_EXEC_T_EXTRACT_WAL_DATA);
taosMemoryFree(pParam);
if (code) {
tqError("vgId:%d failed sched task to scan wal", vgId);
}
}
int32_t tqScanWalInFuture(STQ* pTq, int32_t numOfTasks, int32_t idleDuration) {
SStreamMeta* pMeta = pTq->pStreamMeta;
int32_t code = 0;
int32_t vgId = TD_VID(pTq->pVnode);
SBuildScanWalMsgParam* pParam = taosMemoryMalloc(sizeof(SBuildScanWalMsgParam));
if (pParam == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pParam->pTq = pTq;
pParam->numOfTasks = numOfTasks;
tmr_h pTimer = streamTimerGetInstance();
ASSERT(pTimer);
tmr_h pTimer = NULL;
code = streamTimerGetInstance(&pTimer);
if (code) {
tqError("vgId:%d failed to get tmr ctrl during sched scan wal", vgId);
return code;
}
if (pMeta->scanInfo.scanTimer == NULL) {
pMeta->scanInfo.scanTimer = taosTmrStart(doStartScanWal, idleDuration, pParam, pTimer);
} else {
taosTmrReset(doStartScanWal, idleDuration, pParam, pTimer, &pMeta->scanInfo.scanTimer);
code = taosTmrReset(doStartScanWal, idleDuration, pParam, pTimer, &pMeta->scanInfo.scanTimer);
if (code) {
tqError("vgId:%d failed to start scan wal in:%dms", vgId, idleDuration);
}
}
return TSDB_CODE_SUCCESS;
return code;
}
int32_t tqScanWalAsync(STQ* pTq, bool ckPause) {
@ -207,7 +232,11 @@ bool handleFillhistoryScanComplete(SStreamTask* pTask, int64_t ver) {
double el = (taosGetTimestampMs() - pTask->execInfo.step2Start) / 1000.0;
qDebug("s-task:%s scan-history from WAL stage(step 2) ended, range:%" PRId64 "-%" PRId64 ", elapsed time:%.2fs",
id, pTask->step2Range.minVer, maxVer, el);
/*int32_t code = */streamTaskPutTranstateIntoInputQ(pTask);
int32_t code = streamTaskPutTranstateIntoInputQ(pTask);
if (code) {
qError("s-task:%s failed to put trans-state into inputQ", id);
}
return true;
} else {
qWarn("s-task:%s fill-history scan WAL, nextProcessVer:%" PRId64 " out of the ver range:%" PRId64 "-%" PRId64
@ -290,8 +319,12 @@ bool doPutDataIntoInputQ(SStreamTask* pTask, int64_t maxVer, int32_t* numOfItems
break;
}
} else {
walReaderSeekVer(pTask->exec.pWalReader, pTask->chkInfo.nextProcessVer);
tqTrace("s-task:%s append input queue failed, code:too many items, ver:%" PRId64, id, pTask->chkInfo.nextProcessVer);
code = walReaderSeekVer(pTask->exec.pWalReader, pTask->chkInfo.nextProcessVer);
if (code) {
tqError("s-task:%s failed to seek ver to:%"PRId64 " in wal", id, pTask->chkInfo.nextProcessVer);
}
break;
}
}
@ -347,25 +380,25 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) {
int32_t numOfItems = streamQueueGetNumOfItems(pTask->inputq.queue);
int64_t maxVer = (pTask->info.fillHistory == 1) ? pTask->step2Range.maxVer : INT64_MAX;
taosThreadMutexLock(&pTask->lock);
streamMutexLock(&pTask->lock);
SStreamTaskState pState = streamTaskGetStatus(pTask);
if (pState.state != TASK_STATUS__READY) {
tqDebug("s-task:%s not ready for submit block from wal, status:%s", pTask->id.idStr, pState.name);
taosThreadMutexUnlock(&pTask->lock);
streamMutexUnlock(&pTask->lock);
streamMetaReleaseTask(pStreamMeta, pTask);
continue;
}
bool hasNewData = doPutDataIntoInputQ(pTask, maxVer, &numOfItems);
taosThreadMutexUnlock(&pTask->lock);
streamMutexUnlock(&pTask->lock);
if ((numOfItems > 0) || hasNewData) {
noDataInWal = false;
code = streamTrySchedExec(pTask);
if (code != TSDB_CODE_SUCCESS) {
streamMetaReleaseTask(pStreamMeta, pTask);
return -1;
return code;
}
}
@ -378,5 +411,5 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) {
}
taosArrayDestroy(pTaskList);
return 0;
return TSDB_CODE_SUCCESS;
}

View File

@ -160,6 +160,7 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
SRpcMsg rsp = {.info = pMsg->info, .code = TSDB_CODE_SUCCESS};
int64_t st = taosGetTimestampMs();
bool updated = false;
int32_t code = 0;
SStreamTaskNodeUpdateMsg req = {0};
@ -258,26 +259,40 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
}
}
// save
// stream do update the nodeEp info, write it into stream meta.
if (updated) {
tqDebug("s-task:%s vgId:%d save task after update epset, and stop task", idstr, vgId);
streamMetaSaveTask(pMeta, pTask);
code = streamMetaSaveTask(pMeta, pTask);
if (code) {
tqError("s-task:%s vgId:%d failed to save task, code:%s", idstr, vgId, tstrerror(code));
}
if (ppHTask != NULL) {
streamMetaSaveTask(pMeta, *ppHTask);
code = streamMetaSaveTask(pMeta, *ppHTask);
if (code) {
tqError("s-task:%s vgId:%d failed to save related history task, code:%s", idstr, vgId, tstrerror(code));
}
}
} else {
tqDebug("s-task:%s vgId:%d not save task since not update epset actually, stop task", idstr, vgId);
}
streamTaskStop(pTask);
code = streamTaskStop(pTask);
if (code) {
tqError("s-task:%s vgId:%d failed to stop task, code:%s", idstr, vgId, tstrerror(code));
}
if (ppHTask != NULL) {
streamTaskStop(*ppHTask);
code = streamTaskStop(*ppHTask);
if (code) {
tqError("s-task:%s vgId:%d failed to stop related history task, code:%s", idstr, vgId, tstrerror(code));
}
}
// keep info
streamMetaAddIntoUpdateTaskList(pMeta, pTask, (ppHTask != NULL) ? (*ppHTask) : NULL, req.transId, st);
rsp.code = 0;
rsp.code = TSDB_CODE_SUCCESS;
// possibly only handle the stream task.
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
@ -305,13 +320,16 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
#if 0
taosMSleep(5000);// for test purpose, to trigger the leader election
#endif
tqStreamTaskStartAsync(pMeta, cb, true);
code = tqStreamTaskStartAsync(pMeta, cb, true);
if (code) {
tqError("vgId:%d async start all tasks, failed, code:%s", vgId, tstrerror(code));
}
}
}
streamMetaWUnLock(pMeta);
taosArrayDestroy(req.pNodeList);
return rsp.code;
return rsp.code; // always return true
}
int32_t tqStreamTaskProcessDispatchReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
@ -518,7 +536,7 @@ int32_t tqStreamTaskProcessCheckpointReadyMsg(SStreamMeta* pMeta, SRpcMsg* pMsg)
SStreamTask* pTask = NULL;
code = streamMetaAcquireTask(pMeta, req.streamId, req.upstreamTaskId, &pTask);
if (pTask == NULL) {
if (code != 0) {
tqError("vgId:%d failed to find s-task:0x%x, it may have been destroyed already", vgId, req.downstreamTaskId);
return TSDB_CODE_STREAM_TASK_NOT_EXIST;
}
@ -526,11 +544,17 @@ int32_t tqStreamTaskProcessCheckpointReadyMsg(SStreamMeta* pMeta, SRpcMsg* pMsg)
tqDebug("vgId:%d s-task:%s received the checkpoint-ready msg from task:0x%x (vgId:%d), handle it", vgId,
pTask->id.idStr, req.downstreamTaskId, req.downstreamNodeId);
streamProcessCheckpointReadyMsg(pTask, req.checkpointId, req.downstreamTaskId, req.downstreamNodeId);
code = streamProcessCheckpointReadyMsg(pTask, req.checkpointId, req.downstreamTaskId, req.downstreamNodeId);
streamMetaReleaseTask(pMeta, pTask);
if (code) {
return code;
}
{ // send checkpoint ready rsp
SMStreamCheckpointReadyRspMsg* pReadyRsp = rpcMallocCont(sizeof(SMStreamCheckpointReadyRspMsg));
if (pReadyRsp == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pReadyRsp->upstreamTaskId = req.upstreamTaskId;
pReadyRsp->upstreamNodeId = req.upstreamNodeId;
@ -607,7 +631,7 @@ int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, SMsgCb* cb, int64_t sve
SStreamTask* p = NULL;
code = streamMetaAcquireTask(pMeta, streamId, taskId, &p);
if ((p != NULL) && (p->info.fillHistory == 0)) {
tqStreamStartOneTaskAsync(pMeta, cb, streamId, taskId);
code = tqStreamStartOneTaskAsync(pMeta, cb, streamId, taskId);
}
if (p != NULL) {
@ -631,6 +655,7 @@ int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, SMsgCb* cb, int64_t sve
int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen) {
SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg;
int32_t code = 0;
int32_t vgId = pMeta->vgId;
STaskId hTaskId = {0};
tqDebug("vgId:%d receive msg to drop s-task:0x%x", vgId, pReq->taskId);
@ -649,8 +674,12 @@ int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen
}
streamTaskSetRemoveBackendFiles(pTask);
streamTaskClearHTaskAttr(pTask, pReq->resetRelHalt);
code = streamTaskClearHTaskAttr(pTask, pReq->resetRelHalt);
streamMetaReleaseTask(pMeta, pTask);
if (code) {
tqError("s-task:0x%x failed to clear related fill-history info, still exists", pReq->taskId);
}
}
streamMetaWUnLock(pMeta);
@ -746,7 +775,7 @@ static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) {
if (isLeader && !tsDisableStream) {
streamMetaWUnLock(pMeta);
streamMetaStartAllTasks(pMeta);
code = streamMetaStartAllTasks(pMeta);
} else {
streamMetaResetStartInfo(&pMeta->startInfo, pMeta->vgId);
pMeta->startInfo.restartCount = 0;
@ -765,16 +794,16 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead
int32_t vgId = pMeta->vgId;
if (type == STREAM_EXEC_T_START_ONE_TASK) {
streamMetaStartOneTask(pMeta, pReq->streamId, pReq->taskId);
(void) streamMetaStartOneTask(pMeta, pReq->streamId, pReq->taskId);
return 0;
} else if (type == STREAM_EXEC_T_START_ALL_TASKS) {
streamMetaStartAllTasks(pMeta);
(void) streamMetaStartAllTasks(pMeta);
return 0;
} else if (type == STREAM_EXEC_T_RESTART_ALL_TASKS) {
restartStreamTasks(pMeta, isLeader);
(void) restartStreamTasks(pMeta, isLeader);
return 0;
} else if (type == STREAM_EXEC_T_STOP_ALL_TASKS) {
streamMetaStopAllTasks(pMeta);
(void) streamMetaStopAllTasks(pMeta);
return 0;
} else if (type == STREAM_EXEC_T_ADD_FAILED_TASK) {
int32_t code = streamMetaAddFailedTask(pMeta, pReq->streamId, pReq->taskId);
@ -783,7 +812,7 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead
SStreamTask* pTask = NULL;
int32_t code = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId, &pTask);
if (pTask != NULL) {
if (pTask != NULL || (code != 0)) {
char* pStatus = NULL;
if (streamTaskReadyToRun(pTask, &pStatus)) {
int64_t execTs = pTask->status.lastExecTs;
@ -804,12 +833,12 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead
SStreamTask* pTask = NULL;
int32_t code = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId, &pTask);
if (pTask != NULL) { // even in halt status, the data in inputQ must be processed
if (pTask != NULL || (code != 0)) { // even in halt status, the data in inputQ must be processed
char* p = NULL;
if (streamTaskReadyToRun(pTask, &p)) {
tqDebug("vgId:%d s-task:%s status:%s start to process block from inputQ, next checked ver:%" PRId64, vgId,
pTask->id.idStr, p, pTask->chkInfo.nextProcessVer);
streamExecTask(pTask);
(void) streamExecTask(pTask);
} else {
int8_t status = streamTaskSetSchedStatusInactive(pTask);
tqDebug("vgId:%d s-task:%s ignore run req since not in ready state, status:%s, sched-status:%d", vgId,
@ -829,6 +858,7 @@ int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta) {
STaskStartInfo* pStartInfo = &pMeta->startInfo;
int32_t vgId = pMeta->vgId;
bool scanWal = false;
int32_t code = 0;
streamMetaWLock(pMeta);
if (pStartInfo->startAllTasks == 1) {
@ -844,8 +874,7 @@ int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta) {
pStartInfo->restartCount);
streamMetaWUnLock(pMeta);
restartStreamTasks(pMeta, (pMeta->role == NODE_ROLE_LEADER));
return TSDB_CODE_SUCCESS;
return restartStreamTasks(pMeta, (pMeta->role == NODE_ROLE_LEADER));
} else {
if (pStartInfo->restartCount == 0) {
tqDebug("vgId:%d start all tasks completed in callbackFn, restartCount is 0", pMeta->vgId);
@ -862,10 +891,10 @@ int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta) {
if (scanWal && (vgId != SNODE_HANDLE)) {
tqDebug("vgId:%d start scan wal for executing tasks", vgId);
tqScanWalAsync(pMeta->ahandle, true);
code = tqScanWalAsync(pMeta->ahandle, true);
}
return TSDB_CODE_SUCCESS;
return code;
}
int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, char* pMsg) {
@ -873,7 +902,7 @@ int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, char* pMsg) {
SStreamTask* pTask = NULL;
int32_t code = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId, &pTask);
if (pTask == NULL) {
if (pTask == NULL || (code != 0)) {
tqError("vgId:%d process task-reset req, failed to acquire task:0x%x, it may have been dropped already",
pMeta->vgId, pReq->taskId);
return TSDB_CODE_SUCCESS;
@ -881,7 +910,7 @@ int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, char* pMsg) {
tqDebug("s-task:%s receive task-reset msg from mnode, reset status and ready for data processing", pTask->id.idStr);
taosThreadMutexLock(&pTask->lock);
streamMutexLock(&pTask->lock);
streamTaskClearCheckInfo(pTask, true);
// clear flag set during do checkpoint, and open inputQ for all upstream tasks
@ -904,7 +933,7 @@ int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, char* pMsg) {
tqDebug("s-task:%s status:%s do nothing after receiving reset-task from mnode", pTask->id.idStr, pState.name);
}
taosThreadMutexUnlock(&pTask->lock);
streamMutexUnlock(&pTask->lock);
streamMetaReleaseTask(pMeta, pTask);
return TSDB_CODE_SUCCESS;
@ -929,11 +958,10 @@ int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg)
tqError("s-task:%s not ready for checkpoint-trigger retrieve from 0x%x, since downstream not ready",
pTask->id.idStr, (int32_t)pReq->downstreamTaskId);
streamTaskSendCheckpointTriggerMsg(pTask, pReq->downstreamTaskId, pReq->downstreamNodeId, &pMsg->info,
code = streamTaskSendCheckpointTriggerMsg(pTask, pReq->downstreamTaskId, pReq->downstreamNodeId, &pMsg->info,
TSDB_CODE_STREAM_TASK_IVLD_STATUS);
streamMetaReleaseTask(pMeta, pTask);
return TSDB_CODE_SUCCESS;
return code;
}
SStreamTaskState pState = streamTaskGetStatus(pTask);
@ -948,7 +976,7 @@ int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg)
// re-send the lost checkpoint-trigger msg to downstream task
tqDebug("s-task:%s re-send checkpoint-trigger to:0x%x, checkpointId:%" PRId64 ", transId:%d", pTask->id.idStr,
(int32_t)pReq->downstreamTaskId, checkpointId, transId);
streamTaskSendCheckpointTriggerMsg(pTask, pReq->downstreamTaskId, pReq->downstreamNodeId, &pMsg->info,
code = streamTaskSendCheckpointTriggerMsg(pTask, pReq->downstreamTaskId, pReq->downstreamNodeId, &pMsg->info,
TSDB_CODE_SUCCESS);
} else { // not send checkpoint-trigger yet, wait
int32_t recv = 0, total = 0;
@ -962,7 +990,7 @@ int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg)
"sending checkpoint-source/trigger",
pTask->id.idStr, recv, total);
}
streamTaskSendCheckpointTriggerMsg(pTask, pReq->downstreamTaskId, pReq->downstreamNodeId, &pMsg->info,
code = streamTaskSendCheckpointTriggerMsg(pTask, pReq->downstreamTaskId, pReq->downstreamNodeId, &pMsg->info,
TSDB_CODE_ACTION_IN_PROGRESS);
}
} else { // upstream not recv the checkpoint-source/trigger till now
@ -971,12 +999,12 @@ int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg)
"s-task:%s not recv checkpoint-source from mnode or checkpoint-trigger from upstream yet, wait for all "
"upstream sending checkpoint-source/trigger",
pTask->id.idStr);
streamTaskSendCheckpointTriggerMsg(pTask, pReq->downstreamTaskId, pReq->downstreamNodeId, &pMsg->info,
code = streamTaskSendCheckpointTriggerMsg(pTask, pReq->downstreamTaskId, pReq->downstreamNodeId, &pMsg->info,
TSDB_CODE_ACTION_IN_PROGRESS);
}
streamMetaReleaseTask(pMeta, pTask);
return TSDB_CODE_SUCCESS;
return code;
}
int32_t tqStreamTaskProcessRetrieveTriggerRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) {
@ -994,9 +1022,9 @@ int32_t tqStreamTaskProcessRetrieveTriggerRsp(SStreamMeta* pMeta, SRpcMsg* pMsg)
tqDebug("s-task:%s recv re-send checkpoint-trigger msg from upstream:0x%x, checkpointId:%" PRId64 ", transId:%d",
pTask->id.idStr, pRsp->upstreamTaskId, pRsp->checkpointId, pRsp->transId);
streamTaskProcessCheckpointTriggerRsp(pTask, pRsp);
code = streamTaskProcessCheckpointTriggerRsp(pTask, pRsp);
streamMetaReleaseTask(pMeta, pTask);
return TSDB_CODE_SUCCESS;
return code;
}
int32_t tqStreamTaskProcessTaskPauseReq(SStreamMeta* pMeta, char* pMsg) {
@ -1042,6 +1070,8 @@ static int32_t tqProcessTaskResumeImpl(void* handle, SStreamTask* pTask, int64_t
bool fromVnode) {
SStreamMeta* pMeta = fromVnode ? ((STQ*)handle)->pStreamMeta : handle;
int32_t vgId = pMeta->vgId;
int32_t code = 0;
if (pTask == NULL) {
return -1;
}
@ -1065,9 +1095,9 @@ static int32_t tqProcessTaskResumeImpl(void* handle, SStreamTask* pTask, int64_t
if (level == TASK_LEVEL__SOURCE && pTask->info.fillHistory && status == TASK_STATUS__SCAN_HISTORY) {
pTask->hTaskInfo.operatorOpen = false;
streamStartScanHistoryAsync(pTask, igUntreated);
code = streamStartScanHistoryAsync(pTask, igUntreated);
} else if (level == TASK_LEVEL__SOURCE && (streamQueueGetNumOfItems(pTask->inputq.queue) == 0)) {
tqScanWalAsync((STQ*)handle, false);
code = tqScanWalAsync((STQ*)handle, false);
} else {
streamTrySchedExec(pTask);
}
@ -1076,7 +1106,7 @@ static int32_t tqProcessTaskResumeImpl(void* handle, SStreamTask* pTask, int64_t
}*/
streamMetaReleaseTask(pMeta, pTask);
return 0;
return code;
}
int32_t tqStreamTaskProcessTaskResumeReq(void* handle, int64_t sversion, char* msg, bool fromVnode) {
@ -1091,10 +1121,10 @@ int32_t tqStreamTaskProcessTaskResumeReq(void* handle, int64_t sversion, char* m
return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
}
taosThreadMutexLock(&pTask->lock);
streamMutexLock(&pTask->lock);
SStreamTaskState pState = streamTaskGetStatus(pTask);
tqDebug("s-task:%s start to resume from paused, current status:%s", pTask->id.idStr, pState.name);
taosThreadMutexUnlock(&pTask->lock);
streamMutexUnlock(&pTask->lock);
code = tqProcessTaskResumeImpl(handle, pTask, sversion, pReq->igUntreated, fromVnode);
if (code != 0) {
@ -1105,10 +1135,10 @@ int32_t tqStreamTaskProcessTaskResumeReq(void* handle, int64_t sversion, char* m
SStreamTask* pHTask = NULL;
code = streamMetaAcquireTask(pMeta, pHTaskId->streamId, pHTaskId->taskId, &pHTask);
if (pHTask) {
taosThreadMutexLock(&pHTask->lock);
streamMutexLock(&pHTask->lock);
SStreamTaskState p = streamTaskGetStatus(pHTask);
tqDebug("s-task:%s related history task start to resume from paused, current status:%s", pHTask->id.idStr, p.name);
taosThreadMutexUnlock(&pHTask->lock);
streamMutexUnlock(&pHTask->lock);
code = tqProcessTaskResumeImpl(handle, pHTask, sversion, pReq->igUntreated, fromVnode);
}
@ -1139,15 +1169,15 @@ int32_t tqStreamProcessCheckpointReadyRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) {
SStreamTask* pTask = NULL;
int32_t code = streamMetaAcquireTask(pMeta, pRsp->streamId, pRsp->downstreamTaskId, &pTask);
if (pTask == NULL) {
if (pTask == NULL || (code != 0)) {
tqError("vgId:%d failed to acquire task:0x%x when handling checkpoint-ready msg, it may have been dropped",
pRsp->downstreamNodeId, pRsp->downstreamTaskId);
return TSDB_CODE_STREAM_TASK_NOT_EXIST;
}
streamTaskProcessCheckpointReadyRsp(pTask, pRsp->upstreamTaskId, pRsp->checkpointId);
code = streamTaskProcessCheckpointReadyRsp(pTask, pRsp->upstreamTaskId, pRsp->checkpointId);
streamMetaReleaseTask(pMeta, pTask);
return TSDB_CODE_SUCCESS;
return code;
}
int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
@ -1173,11 +1203,11 @@ int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
SStreamTask* pTask = NULL;
code = streamMetaAcquireTask(pMeta, req.streamId, req.taskId, &pTask);
if (pTask == NULL) {
if (pTask == NULL || (code != 0)) {
tqError("vgId:%d process set consensus checkpointId req, failed to acquire task:0x%x, it may have been dropped already",
pMeta->vgId, req.taskId);
streamMetaAddFailedTask(pMeta, req.streamId, req.taskId);
return TSDB_CODE_SUCCESS;
(void)streamMetaAddFailedTask(pMeta, req.streamId, req.taskId);
return code;
}
// discard the rsp, since it is expired.
@ -1193,13 +1223,13 @@ int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
tqDebug("s-task:%s vgId:%d checkpointId:%" PRId64 " restore to consensus-checkpointId:%" PRId64 " from mnode",
pTask->id.idStr, vgId, pTask->chkInfo.checkpointId, req.checkpointId);
taosThreadMutexLock(&pTask->lock);
streamMutexLock(&pTask->lock);
ASSERT(pTask->chkInfo.checkpointId >= req.checkpointId);
if (pTask->chkInfo.consensusTransId >= req.transId) {
tqDebug("s-task:%s vgId:%d latest consensus transId:%d, expired consensus trans:%d, discard",
pTask->id.idStr, vgId, pTask->chkInfo.consensusTransId, req.transId);
taosThreadMutexUnlock(&pTask->lock);
streamMutexUnlock(&pTask->lock);
streamMetaReleaseTask(pMeta, pTask);
return TSDB_CODE_SUCCESS;
}
@ -1215,14 +1245,14 @@ int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
}
pTask->chkInfo.consensusTransId = req.transId;
taosThreadMutexUnlock(&pTask->lock);
streamMutexUnlock(&pTask->lock);
if (pMeta->role == NODE_ROLE_LEADER) {
/*code = */ tqStreamStartOneTaskAsync(pMeta, pTask->pMsgCb, req.streamId, req.taskId);
code = tqStreamStartOneTaskAsync(pMeta, pTask->pMsgCb, req.streamId, req.taskId);
} else {
tqDebug("vgId:%d follower not start task:%s", vgId, pTask->id.idStr);
}
streamMetaReleaseTask(pMeta, pTask);
return TSDB_CODE_SUCCESS;
return code;
}

View File

@ -231,14 +231,8 @@ void initCheckpointReadyInfo(STaskCheckpointReadyInfo* pReadyInfo, int32_t up
int32_t initCheckpointReadyMsg(SStreamTask* pTask, int32_t upstreamNodeId, int32_t upstreamTaskId, int32_t childId,
int64_t checkpointId, SRpcMsg* pMsg);
typedef int32_t (*__stream_async_exec_fn_t)(void* param);
int32_t streamMetaAsyncExec(SStreamMeta* pMeta, __stream_async_exec_fn_t fn, void* param, int32_t* code);
void flushStateDataInExecutor(SStreamTask* pTask, SStreamQueueItem* pCheckpointBlock);
void streamMutexLock(TdThreadMutex *pMutex);
void streamMutexUnlock(TdThreadMutex *pMutex);
void streamMutexDestroy(TdThreadMutex *pMutex);
#ifdef __cplusplus
}

View File

@ -1540,7 +1540,7 @@ int32_t chkpLoadExtraInfo(char* pChkpIdDir, int64_t* chkpId, int64_t* processId)
// compatible with previous version
*processId = -1;
code = 0;
stError("failed to open file to load extra info, file:%s, reason:%s", pDst, tstrerror(TAOS_SYSTEM_ERROR(errno)));
stWarn("failed to open file to load extra info, file:%s, reason:%s", pDst, tstrerror(TAOS_SYSTEM_ERROR(errno)));
goto _EXIT;
}
@ -2308,6 +2308,7 @@ _EXIT:
taosMemoryFree(cfHandle);
return code;
}
void* taskDbAddRef(void* pTaskDb) {
STaskDbWrapper* pBackend = pTaskDb;
return taosAcquireRef(taskDbWrapperId, pBackend->refId);

View File

@ -45,7 +45,7 @@ typedef struct STaskInitTs {
SMetaRefMgt gMetaRefMgt;
void metaRefMgtInit();
int32_t metaRefMgtInit();
void metaRefMgtCleanup();
int32_t metaRefMgtAdd(int64_t vgId, int64_t* rid);
@ -56,9 +56,14 @@ static void streamMetaEnvInit() {
streamMetaId = taosOpenRef(64, streamMetaCloseImpl);
metaRefMgtInit();
int32_t code = streamTimerInit();
if (code != 0) {
int32_t code = metaRefMgtInit();
if (code) {
stError("failed to init stream meta mgmt env, start failed");
return;
}
code = streamTimerInit();
if (code) {
stError("failed to init stream meta env, start failed");
}
}
@ -66,17 +71,29 @@ static void streamMetaEnvInit() {
void streamMetaInit() { (void) taosThreadOnce(&streamMetaModuleInit, streamMetaEnvInit); }
void streamMetaCleanup() {
taosCloseRef(streamBackendId);
taosCloseRef(streamBackendCfWrapperId);
taosCloseRef(streamMetaId);
(void) taosCloseRef(streamBackendId);
(void) taosCloseRef(streamBackendCfWrapperId);
(void) taosCloseRef(streamMetaId);
metaRefMgtCleanup();
streamTimerCleanUp();
}
void metaRefMgtInit() {
taosThreadMutexInit(&(gMetaRefMgt.mutex), NULL);
gMetaRefMgt.pTable = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
int32_t metaRefMgtInit() {
int32_t code = taosThreadMutexInit(&(gMetaRefMgt.mutex), NULL);
if (code) {
return code;
}
if (code == 0) {
gMetaRefMgt.pTable = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
}
if (gMetaRefMgt.pTable == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
} else {
return code;
}
}
void metaRefMgtCleanup() {
@ -96,20 +113,34 @@ void metaRefMgtCleanup() {
}
int32_t metaRefMgtAdd(int64_t vgId, int64_t* rid) {
int32_t code = 0;
void* p = NULL;
streamMutexLock(&gMetaRefMgt.mutex);
void* p = taosHashGet(gMetaRefMgt.pTable, &vgId, sizeof(vgId));
p = taosHashGet(gMetaRefMgt.pTable, &vgId, sizeof(vgId));
if (p == NULL) {
SArray* list = taosArrayInit(8, sizeof(void*));
taosArrayPush(list, &rid);
taosHashPut(gMetaRefMgt.pTable, &vgId, sizeof(vgId), &list, sizeof(void*));
p = taosArrayPush(list, &rid);
if (p == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
code = taosHashPut(gMetaRefMgt.pTable, &vgId, sizeof(vgId), &list, sizeof(void*));
if (code) {
stError("vgId:%d failed to put into metaRef table, rid:%" PRId64, (int32_t) vgId, *rid);
return code;
}
} else {
SArray* list = *(SArray**)p;
taosArrayPush(list, &rid);
void* px = taosArrayPush(list, &rid);
if (px == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
}
}
streamMutexUnlock(&gMetaRefMgt.mutex);
return 0;
return code;
}
int32_t streamMetaOpenTdb(SStreamMeta* pMeta) {
@ -141,19 +172,25 @@ enum STREAM_STATE_VER {
};
int32_t streamMetaCheckBackendCompatible(SStreamMeta* pMeta) {
int8_t ret = STREAM_STATA_COMPATIBLE;
TBC* pCur = NULL;
if (tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL) < 0) { // no task info, no stream
return ret;
}
int8_t ret = STREAM_STATA_COMPATIBLE;
TBC* pCur = NULL;
int32_t code = 0;
void* pKey = NULL;
int32_t kLen = 0;
void* pVal = NULL;
int32_t vLen = 0;
tdbTbcMoveToFirst(pCur);
if (tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL) < 0) { // no task info, no stream
return ret;
}
code = tdbTbcMoveToFirst(pCur);
if (code) {
(void) tdbTbcClose(pCur);
stError("vgId:%d failed to open stream meta file cursor, not perform compatible check", pMeta->vgId);
return ret;
}
while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
if (pVal == NULL || vLen == 0) {
break;
@ -178,7 +215,7 @@ int32_t streamMetaCheckBackendCompatible(SStreamMeta* pMeta) {
tdbFree(pKey);
tdbFree(pVal);
tdbTbcClose(pCur);
(void) tdbTbcClose(pCur);
return ret;
}
@ -244,7 +281,11 @@ int32_t streamTaskSetDb(SStreamMeta* pMeta, SStreamTask* pTask, const char* key)
streamMutexLock(&pMeta->backendMutex);
void** ppBackend = taosHashGet(pMeta->pTaskDbUnique, key, strlen(key));
if ((ppBackend != NULL) && (*ppBackend != NULL)) {
taskDbAddRef(*ppBackend);
void* p = taskDbAddRef(*ppBackend);
if (p == NULL) {
stError("s-task:0x%x failed to ref backend", pTask->id.taskId);
return TSDB_CODE_FAILED;
}
STaskDbWrapper* pBackend = *ppBackend;
pBackend->pMeta = pMeta;
@ -278,7 +319,10 @@ int32_t streamTaskSetDb(SStreamMeta* pMeta, SStreamTask* pTask, const char* key)
if (processVer != -1) pTask->chkInfo.processedVer = processVer;
taosHashPut(pMeta->pTaskDbUnique, key, strlen(key), &pBackend, sizeof(void*));
int32_t code = taosHashPut(pMeta->pTaskDbUnique, key, strlen(key), &pBackend, sizeof(void*));
if (code) {
stError("s-task:0x%x failed to put taskDb backend, code:out of memory", pTask->id.taskId);
}
streamMutexUnlock(&pMeta->backendMutex);
stDebug("s-task:0x%x set backend %p", pTask->id.taskId, pBackend);
@ -290,7 +334,10 @@ void streamMetaRemoveDB(void* arg, char* key) {
SStreamMeta* pMeta = arg;
streamMutexLock(&pMeta->backendMutex);
taosHashRemove(pMeta->pTaskDbUnique, key, strlen(key));
int32_t code = taosHashRemove(pMeta->pTaskDbUnique, key, strlen(key));
if (code) {
stError("vgId:%d failed to remove key:%s in taskDbUnique map", pMeta->vgId, key);
}
streamMutexUnlock(&pMeta->backendMutex);
}
@ -398,12 +445,22 @@ int32_t streamMetaOpen(const char* path, void* ahandle, FTaskBuild buildTaskFn,
}
#endif
taosThreadRwlockInit(&pMeta->lock, &attr);
taosThreadRwlockAttrDestroy(&attr);
code = taosThreadRwlockInit(&pMeta->lock, &attr);
if (code) {
goto _err;
}
code = taosThreadRwlockAttrDestroy(&attr);
if (code) {
goto _err;
}
int64_t* pRid = taosMemoryMalloc(sizeof(int64_t));
memcpy(pRid, &pMeta->rid, sizeof(pMeta->rid));
metaRefMgtAdd(pMeta->vgId, pRid);
code = metaRefMgtAdd(pMeta->vgId, pRid);
if (code) {
goto _err;
}
code = createMetaHbInfo(pRid, &pMeta->pHbInfo);
if (code != TSDB_CODE_SUCCESS) {
@ -416,7 +473,8 @@ int32_t streamMetaOpen(const char* path, void* ahandle, FTaskBuild buildTaskFn,
if (pMeta->bkdChkptMgt == NULL) {
goto _err;
}
taosThreadMutexInit(&pMeta->backendMutex, NULL);
code = taosThreadMutexInit(&pMeta->backendMutex, NULL);
*p = pMeta;
return code;
@ -425,9 +483,9 @@ _err:
taosMemoryFree(pMeta->path);
if (pMeta->pTasksMap) taosHashCleanup(pMeta->pTasksMap);
if (pMeta->pTaskList) taosArrayDestroy(pMeta->pTaskList);
if (pMeta->pTaskDb) tdbTbClose(pMeta->pTaskDb);
if (pMeta->pCheckpointDb) tdbTbClose(pMeta->pCheckpointDb);
if (pMeta->db) tdbClose(pMeta->db);
if (pMeta->pTaskDb) (void)tdbTbClose(pMeta->pTaskDb);
if (pMeta->pCheckpointDb) (void)tdbTbClose(pMeta->pCheckpointDb);
if (pMeta->db) (void) tdbClose(pMeta->db);
if (pMeta->pHbInfo) taosMemoryFreeClear(pMeta->pHbInfo);
if (pMeta->updateInfo.pTasks) taosHashCleanup(pMeta->updateInfo.pTasks);
if (pMeta->startInfo.pReadyTaskSet) taosHashCleanup(pMeta->startInfo.pReadyTaskSet);
@ -473,7 +531,7 @@ void streamMetaClear(SStreamMeta* pMeta) {
// release the ref by timer
if (p->info.delaySchedParam != 0 && p->info.fillHistory == 0) { // one more ref in timer
stDebug("s-task:%s stop schedTimer, and (before) desc ref:%d", p->id.idStr, p->refCnt);
taosTmrStop(p->schedInfo.pDelayTimer);
(void) taosTmrStop(p->schedInfo.pDelayTimer);
p->info.delaySchedParam = 0;
streamMetaReleaseTask(pMeta, p);
}
@ -481,7 +539,11 @@ void streamMetaClear(SStreamMeta* pMeta) {
streamMetaReleaseTask(pMeta, p);
}
taosRemoveRef(streamBackendId, pMeta->streamBackendRid);
int32_t code = taosRemoveRef(streamBackendId, pMeta->streamBackendRid);
if (code) {
stError("vgId:%d remove stream backend Ref failed, rid:%"PRId64, pMeta->vgId, pMeta->streamBackendRid);
}
taosHashClear(pMeta->pTasksMap);
taosArrayClear(pMeta->pTaskList);
@ -502,14 +564,7 @@ void streamMetaClose(SStreamMeta* pMeta) {
if (pMeta == NULL) {
return;
}
// int64_t rid = *(int64_t*)pMeta->pRid;
// if (taosTmrStop(pMeta->hbInfo.hbTmr)) {
// taosMemoryFree(pMeta->pRid);
// } else {
// // do nothing, stop by timer thread
// }
taosRemoveRef(streamMetaId, pMeta->rid);
(void) taosRemoveRef(streamMetaId, pMeta->rid);
}
void streamMetaCloseImpl(void* arg) {
@ -525,10 +580,11 @@ void streamMetaCloseImpl(void* arg) {
streamMetaClear(pMeta);
streamMetaWUnLock(pMeta);
tdbAbort(pMeta->db, pMeta->txn);
tdbTbClose(pMeta->pTaskDb);
tdbTbClose(pMeta->pCheckpointDb);
tdbClose(pMeta->db);
// already log the error, ignore here
(void) tdbAbort(pMeta->db, pMeta->txn);
(void) tdbTbClose(pMeta->pTaskDb);
(void) tdbTbClose(pMeta->pCheckpointDb);
(void) tdbClose(pMeta->db);
taosArrayDestroy(pMeta->pTaskList);
taosArrayDestroy(pMeta->chkpSaved);
@ -552,7 +608,7 @@ void streamMetaCloseImpl(void* arg) {
bkdMgtDestroy(pMeta->bkdChkptMgt);
pMeta->role = NODE_ROLE_UNINIT;
taosThreadRwlockDestroy(&pMeta->lock);
(void) taosThreadRwlockDestroy(&pMeta->lock);
taosMemoryFree(pMeta);
stDebug("vgId:%d end to close stream meta", vgId);
@ -568,9 +624,10 @@ int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask) {
if (code < 0) {
return -1;
}
buf = taosMemoryCalloc(1, len);
if (buf == NULL) {
return -1;
return TSDB_CODE_OUT_OF_MEMORY;
}
if (pTask->ver < SSTREAM_TASK_SUBTABLE_CHANGED_VER) {
@ -579,13 +636,19 @@ int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, len);
tEncodeStreamTask(&encoder, pTask);
code = tEncodeStreamTask(&encoder, pTask);
tEncoderClear(&encoder);
if (code) {
stError("s-task:%s vgId:%d task meta encode failed, code:%s", pTask->id.idStr, vgId, tstrerror(code));
return TSDB_CODE_INVALID_MSG;
}
int64_t id[2] = {pTask->id.streamId, pTask->id.taskId};
code = tdbTbUpsert(pMeta->pTaskDb, id, STREAM_TASK_KEY_LEN, buf, len, pMeta->txn);
if (code != TSDB_CODE_SUCCESS) {
code = terrno;
stError("s-task:%s vgId:%d task meta save to disk failed, code:%s", pTask->id.idStr, vgId, tstrerror(terrno));
} else {
stDebug("s-task:%s vgId:%d task meta save to disk", pTask->id.idStr, vgId);
@ -612,33 +675,44 @@ int32_t streamMetaRemoveTask(SStreamMeta* pMeta, STaskId* pTaskId) {
int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask, bool* pAdded) {
*pAdded = false;
int32_t code = 0;
STaskId id = streamTaskGetTaskId(pTask);
void* p = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
if (p != NULL) {
return 0;
stDebug("s-task:%" PRIx64 " already exist in meta, no need to register", id.taskId);
return code;
}
if (pMeta->buildTaskFn(pMeta->ahandle, pTask, ver) < 0) {
return -1;
if ((code = pMeta->buildTaskFn(pMeta->ahandle, pTask, ver)) != 0) {
return code;
}
taosArrayPush(pMeta->pTaskList, &pTask->id);
taosHashPut(pMeta->pTasksMap, &id, sizeof(id), &pTask, POINTER_BYTES);
if (streamMetaSaveTask(pMeta, pTask) < 0) {
return -1;
p = taosArrayPush(pMeta->pTaskList, &pTask->id);
if (p == NULL) {
stError("s-task:0x%"PRIx64" failed to register task into meta-list, code: out of memory", id.taskId);
return TSDB_CODE_OUT_OF_MEMORY;
}
if (streamMetaCommit(pMeta) < 0) {
return -1;
code = taosHashPut(pMeta->pTasksMap, &id, sizeof(id), &pTask, POINTER_BYTES);
if (code) {
stError("s-task:0x%"PRIx64" failed to register task into meta-list, code: out of memory", id.taskId);
return code;
}
if ((code = streamMetaSaveTask(pMeta, pTask)) != 0) {
return code;
}
if ((code = streamMetaCommit(pMeta)) != 0) {
return code;
}
if (pTask->info.fillHistory == 0) {
atomic_add_fetch_32(&pMeta->numOfStreamTasks, 1);
(void) atomic_add_fetch_32(&pMeta->numOfStreamTasks, 1);
}
*pAdded = true;
return 0;
return code;
}
int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta) {
@ -703,7 +777,7 @@ static void doRemoveIdFromList(SArray* pTaskList, int32_t num, SStreamTaskId* id
static int32_t streamTaskSendTransSuccessMsg(SStreamTask* pTask, void* param) {
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
streamTaskSendCheckpointSourceRsp(pTask);
(void) streamTaskSendCheckpointSourceRsp(pTask);
}
return 0;
}
@ -726,7 +800,7 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t
}
// handle the dropping event
streamTaskHandleEventAsync(pTask->status.pSM, TASK_EVENT_DROPPING, streamTaskSendTransSuccessMsg, NULL);
(void) streamTaskHandleEventAsync(pTask->status.pSM, TASK_EVENT_DROPPING, streamTaskSendTransSuccessMsg, NULL);
} else {
stDebug("vgId:%d failed to find the task:0x%x, it may be dropped already", pMeta->vgId, taskId);
streamMetaWUnLock(pMeta);
@ -762,12 +836,12 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t
pTask = *ppTask;
// it is an fill-history task, remove the related stream task's id that points to it
if (pTask->info.fillHistory == 0) {
atomic_sub_fetch_32(&pMeta->numOfStreamTasks, 1);
(void) atomic_sub_fetch_32(&pMeta->numOfStreamTasks, 1);
}
taosHashRemove(pMeta->pTasksMap, &id, sizeof(id));
(void) taosHashRemove(pMeta->pTasksMap, &id, sizeof(id));
doRemoveIdFromList(pMeta->pTaskList, (int32_t)taosArrayGetSize(pMeta->pTaskList), &pTask->id);
streamMetaRemoveTask(pMeta, &id);
(void) streamMetaRemoveTask(pMeta, &id);
ASSERT(taosHashGetSize(pMeta->pTasksMap) == taosArrayGetSize(pMeta->pTaskList));
streamMetaWUnLock(pMeta);
@ -775,7 +849,7 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t
ASSERT(pTask->status.timerActive == 0);
if (pTask->info.delaySchedParam != 0 && pTask->info.fillHistory == 0) {
stDebug("s-task:%s stop schedTimer, and (before) desc ref:%d", pTask->id.idStr, pTask->refCnt);
taosTmrStop(pTask->schedInfo.pDelayTimer);
(void) taosTmrStop(pTask->schedInfo.pDelayTimer);
pTask->info.delaySchedParam = 0;
streamMetaReleaseTask(pMeta, pTask);
}
@ -820,9 +894,11 @@ int32_t streamMetaCommit(SStreamMeta* pMeta) {
int64_t streamMetaGetLatestCheckpointId(SStreamMeta* pMeta) {
int64_t checkpointId = 0;
int32_t code = 0;
TBC* pCur = NULL;
if (tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL) < 0) {
stError("failed to open stream meta file, the latest checkpointId is 0, vgId:%d", pMeta->vgId);
return checkpointId;
}
@ -832,7 +908,13 @@ int64_t streamMetaGetLatestCheckpointId(SStreamMeta* pMeta) {
int32_t vLen = 0;
SDecoder decoder;
tdbTbcMoveToFirst(pCur);
code = tdbTbcMoveToFirst(pCur);
if (code) {
(void) tdbTbcClose(pCur);
stError("failed to open stream meta file cursor, the latest checkpointId is 0, vgId:%d", pMeta->vgId);
return checkpointId;
}
while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
if (pVal == NULL || vLen == 0) {
break;
@ -851,8 +933,8 @@ int64_t streamMetaGetLatestCheckpointId(SStreamMeta* pMeta) {
tdbFree(pKey);
tdbFree(pVal);
tdbTbcClose(pCur);
(void)tdbTbcClose(pCur);
return checkpointId;
}
@ -864,23 +946,34 @@ void streamMetaLoadAllTasks(SStreamMeta* pMeta) {
void* pVal = NULL;
int32_t vLen = 0;
SDecoder decoder;
int32_t vgId = 0;
int32_t code = 0;
SArray* pRecycleList = NULL;
if (pMeta == NULL) {
return;
}
SArray* pRecycleList = taosArrayInit(4, sizeof(STaskId));
int32_t vgId = pMeta->vgId;
pRecycleList = taosArrayInit(4, sizeof(STaskId));
vgId = pMeta->vgId;
stInfo("vgId:%d load stream tasks from meta files", vgId);
int32_t code = tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL);
code = tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL);
if (code != TSDB_CODE_SUCCESS) {
stError("vgId:%d failed to open stream meta, code:%s, not load any stream tasks", vgId, tstrerror(terrno));
taosArrayDestroy(pRecycleList);
return;
}
tdbTbcMoveToFirst(pCur);
code = tdbTbcMoveToFirst(pCur);
if (code) {
stError("vgId:%d failed to open stream meta cursor, code:%s, not load any stream tasks", vgId, tstrerror(terrno));
taosArrayDestroy(pRecycleList);
(void) tdbTbcClose(pCur);
return;
}
while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
if (pVal == NULL || vLen == 0) {
break;
@ -910,7 +1003,7 @@ void streamMetaLoadAllTasks(SStreamMeta* pMeta) {
tFreeStreamTask(pTask);
STaskId id = streamTaskGetTaskId(pTask);
taosArrayPush(pRecycleList, &id);
(void) taosArrayPush(pRecycleList, &id);
int32_t total = taosArrayGetSize(pRecycleList);
stDebug("s-task:0x%x is already dropped, add into recycle list, total:%d", taskId, total);
@ -931,7 +1024,7 @@ void streamMetaLoadAllTasks(SStreamMeta* pMeta) {
continue;
}
taosArrayPush(pMeta->pTaskList, &pTask->id);
(void) taosArrayPush(pMeta->pTaskList, &pTask->id);
} else {
// todo this should replace the existed object put by replay creating stream task msg from mnode
stError("s-task:0x%x already added into table meta by replaying WAL, need check", pTask->id.taskId);
@ -941,17 +1034,17 @@ void streamMetaLoadAllTasks(SStreamMeta* pMeta) {
if (taosHashPut(pMeta->pTasksMap, &id, sizeof(id), &pTask, POINTER_BYTES) != 0) {
stError("s-task:0x%x failed to put into hashTable, code:%s, continue", pTask->id.taskId, tstrerror(terrno));
taosArrayPop(pMeta->pTaskList);
(void) taosArrayPop(pMeta->pTaskList);
tFreeStreamTask(pTask);
continue;
}
if (pTask->info.fillHistory == 0) {
atomic_add_fetch_32(&pMeta->numOfStreamTasks, 1);
(void) atomic_add_fetch_32(&pMeta->numOfStreamTasks, 1);
}
if (streamTaskShouldPause(pTask)) {
atomic_add_fetch_32(&pMeta->numOfPausedTasks, 1);
(void) atomic_add_fetch_32(&pMeta->numOfPausedTasks, 1);
}
ASSERT(pTask->status.downstreamReady == 0);
@ -967,7 +1060,7 @@ void streamMetaLoadAllTasks(SStreamMeta* pMeta) {
if (taosArrayGetSize(pRecycleList) > 0) {
for (int32_t i = 0; i < taosArrayGetSize(pRecycleList); ++i) {
STaskId* pId = taosArrayGet(pRecycleList, i);
streamMetaRemoveTask(pMeta, pId);
(void) streamMetaRemoveTask(pMeta, pId);
}
}
@ -995,7 +1088,7 @@ bool streamMetaTaskInTimer(SStreamMeta* pMeta) {
SStreamTask* pTask = *(SStreamTask**)pIter;
if (pTask->status.timerActive >= 1) {
stDebug("s-task:%s in timer, blocking tasks in vgId:%d restart, set closing again", pTask->id.idStr, pMeta->vgId);
streamTaskStop(pTask);
(void) streamTaskStop(pTask);
inTimer = true;
}
}
@ -1028,7 +1121,7 @@ void streamMetaNotifyClose(SStreamMeta* pMeta) {
SStreamTask* pTask = *(SStreamTask**)pIter;
stDebug("vgId:%d s-task:%s set task closing flag", vgId, pTask->id.idStr);
streamTaskStop(pTask);
(void) streamTaskStop(pTask);
}
streamMetaWUnLock(pMeta);
@ -1047,7 +1140,16 @@ void streamMetaNotifyClose(SStreamMeta* pMeta) {
void streamMetaStartHb(SStreamMeta* pMeta) {
int64_t* pRid = taosMemoryMalloc(sizeof(int64_t));
metaRefMgtAdd(pMeta->vgId, pRid);
if (pRid == NULL) {
stError("vgId:%d failed to prepare the metaHb to mnode, hbMsg will not started, code: out of memory", pMeta->vgId);
return;
}
int32_t code = metaRefMgtAdd(pMeta->vgId, pRid);
if (code) {
return;
}
*pRid = pMeta->rid;
streamMetaHbToMnode(pRid, NULL);
}
@ -1066,7 +1168,7 @@ void streamMetaResetStartInfo(STaskStartInfo* pStartInfo, int32_t vgId) {
void streamMetaRLock(SStreamMeta* pMeta) {
// stTrace("vgId:%d meta-rlock", pMeta->vgId);
taosThreadRwlockRdlock(&pMeta->lock);
(void) taosThreadRwlockRdlock(&pMeta->lock);
}
void streamMetaRUnLock(SStreamMeta* pMeta) {
@ -1081,30 +1183,13 @@ void streamMetaRUnLock(SStreamMeta* pMeta) {
void streamMetaWLock(SStreamMeta* pMeta) {
// stTrace("vgId:%d meta-wlock", pMeta->vgId);
taosThreadRwlockWrlock(&pMeta->lock);
(void) taosThreadRwlockWrlock(&pMeta->lock);
// stTrace("vgId:%d meta-wlock completed", pMeta->vgId);
}
void streamMetaWUnLock(SStreamMeta* pMeta) {
// stTrace("vgId:%d meta-wunlock", pMeta->vgId);
taosThreadRwlockUnlock(&pMeta->lock);
}
static void execHelper(struct SSchedMsg* pSchedMsg) {
__async_exec_fn_t execFn = (__async_exec_fn_t)pSchedMsg->ahandle;
int32_t code = execFn(pSchedMsg->thandle);
if (code != 0 && pSchedMsg->msg != NULL) {
*(int32_t*)pSchedMsg->msg = code;
}
}
int32_t streamMetaAsyncExec(SStreamMeta* pMeta, __stream_async_exec_fn_t fn, void* param, int32_t* code) {
SSchedMsg schedMsg = {0};
schedMsg.fp = execHelper;
schedMsg.ahandle = fn;
schedMsg.thandle = param;
schedMsg.msg = code;
return taosScheduleTask(pMeta->qHandle, &schedMsg);
(void) taosThreadRwlockUnlock(&pMeta->lock);
}
int32_t streamMetaSendMsgBeforeCloseTasks(SStreamMeta* pMeta, SArray** pList) {
@ -1192,10 +1277,10 @@ static int32_t prepareBeforeStartTasks(SStreamMeta* pMeta, SArray** pList, int64
taosHashClear(pMeta->startInfo.pFailedTaskSet);
pMeta->startInfo.startTs = now;
streamMetaResetTaskStatus(pMeta);
int32_t code = streamMetaResetTaskStatus(pMeta);
streamMetaWUnLock(pMeta);
return TSDB_CODE_SUCCESS;
return code;
}
// restore the checkpoint id by negotiating the latest consensus checkpoint id
@ -1230,7 +1315,7 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) {
code = streamMetaAcquireTask(pMeta, pTaskId->streamId, pTaskId->taskId, &pTask);
if (pTask == NULL) {
stError("vgId:%d failed to acquire task:0x%x during start tasks", pMeta->vgId, pTaskId->taskId);
streamMetaAddFailedTask(pMeta, pTaskId->streamId, pTaskId->taskId);
(void) streamMetaAddFailedTask(pMeta, pTaskId->streamId, pTaskId->taskId);
continue;
}
@ -1253,7 +1338,7 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) {
code = streamMetaAcquireTask(pMeta, pTaskId->streamId, pTaskId->taskId, &pTask);
if (pTask == NULL) {
stError("vgId:%d failed to acquire task:0x%x during start tasks", pMeta->vgId, pTaskId->taskId);
streamMetaAddFailedTask(pMeta, pTaskId->streamId, pTaskId->taskId);
(void) streamMetaAddFailedTask(pMeta, pTaskId->streamId, pTaskId->taskId);
continue;
}
@ -1271,7 +1356,7 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) {
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
stDebug("s-task:%s downstream ready, no need to check downstream, check only related fill-history task",
pTask->id.idStr);
streamLaunchFillHistoryTask(pTask);
(void) streamLaunchFillHistoryTask(pTask); // todo: how about retry launch fill-history task?
}
(void) streamMetaAddTaskLaunchResult(pMeta, pTaskId->streamId, pTaskId->taskId, pInfo->checkTs, pInfo->readyTs, true);
@ -1337,7 +1422,7 @@ int32_t streamMetaStopAllTasks(SStreamMeta* pMeta) {
continue;
}
streamTaskStop(pTask);
(void) streamTaskStop(pTask);
streamMetaReleaseTask(pMeta, pTask);
}
@ -1377,7 +1462,7 @@ int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t tas
code = streamMetaAcquireTask(pMeta, streamId, taskId, &pTask);
if (pTask == NULL) {
stError("vgId:%d failed to acquire task:0x%x when starting task", pMeta->vgId, taskId);
streamMetaAddFailedTask(pMeta, streamId, taskId);
(void) streamMetaAddFailedTask(pMeta, streamId, taskId);
return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
}
@ -1468,7 +1553,10 @@ int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int3
SHashObj* pDst = ready ? pStartInfo->pReadyTaskSet : pStartInfo->pFailedTaskSet;
STaskInitTs initTs = {.start = startTs, .end = endTs, .success = ready};
taosHashPut(pDst, &id, sizeof(id), &initTs, sizeof(STaskInitTs));
int32_t code = taosHashPut(pDst, &id, sizeof(id), &initTs, sizeof(STaskInitTs));
if (code) {
}
int32_t numOfTotal = streamMetaGetNumOfTasks(pMeta);
int32_t numOfRecv = taosHashGetSize(pStartInfo->pReadyTaskSet) + taosHashGetSize(pStartInfo->pFailedTaskSet);
@ -1488,14 +1576,14 @@ int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int3
streamMetaResetStartInfo(pStartInfo, pMeta->vgId);
streamMetaWUnLock(pMeta);
pStartInfo->completeFn(pMeta);
code = pStartInfo->completeFn(pMeta);
} else {
streamMetaWUnLock(pMeta);
stDebug("vgId:%d recv check downstream results, s-task:0x%x succ:%d, received:%d, total:%d", pMeta->vgId, taskId,
ready, numOfRecv, numOfTotal);
}
return TSDB_CODE_SUCCESS;
return code;
}
int32_t streamMetaResetTaskStatus(SStreamMeta* pMeta) {
@ -1569,19 +1657,26 @@ void streamMetaAddIntoUpdateTaskList(SStreamMeta* pMeta, SStreamTask* pTask, SSt
int64_t startTs) {
const char* id = pTask->id.idStr;
int32_t vgId = pTask->pMeta->vgId;
int32_t code = 0;
// keep the already updated info
STaskUpdateEntry entry = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId, .transId = transId};
taosHashPut(pMeta->updateInfo.pTasks, &entry, sizeof(entry), NULL, 0);
code = taosHashPut(pMeta->updateInfo.pTasks, &entry, sizeof(entry), NULL, 0);
if (code != 0) {
stError("s-task:%s failed to put updateTask into update list", id);
}
int64_t el = taosGetTimestampMs() - startTs;
if (pHTask != NULL) {
STaskUpdateEntry hEntry = {.streamId = pHTask->id.streamId, .taskId = pHTask->id.taskId, .transId = transId};
taosHashPut(pMeta->updateInfo.pTasks, &hEntry, sizeof(hEntry), NULL, 0);
stDebug("s-task:%s vgId:%d transId:%d task nodeEp update completed, streamTask/hTask closed, elapsed:%" PRId64
" ms",
id, vgId, transId, el);
code = taosHashPut(pMeta->updateInfo.pTasks, &hEntry, sizeof(hEntry), NULL, 0);
if (code != 0) {
stError("s-task:%s failed to put updateTask into update list", id);
} else {
stDebug("s-task:%s vgId:%d transId:%d task nodeEp update completed, streamTask/hTask closed, elapsed:%" PRId64
" ms",
id, vgId, transId, el);
}
} else {
stDebug("s-task:%s vgId:%d transId:%d task nodeEp update completed, streamTask closed, elapsed time:%" PRId64 "ms",
id, vgId, transId, el);

View File

@ -130,7 +130,6 @@ int32_t streamGetFileSize(char* path, char* name, int64_t* sz) {
int32_t ret = 0;
char* fullname = taosMemoryCalloc(1, strlen(path) + 32);
sprintf(fullname, "%s%s%s", path, TD_DIRSEP, name);
ret = taosStatFile(fullname, sz, NULL, NULL);
@ -185,48 +184,89 @@ void snapFileDebugInfo(SBackendSnapFile2* pSnapFile) {
}
int32_t snapFileGenMeta(SBackendSnapFile2* pSnapFile) {
void* p = NULL;
SBackendFileItem item = {0};
item.ref = 1;
// current
item.name = pSnapFile->pCurrent;
item.type = ROCKSDB_CURRENT_TYPE;
streamGetFileSize(pSnapFile->path, item.name, &item.size);
taosArrayPush(pSnapFile->pFileList, &item);
int32_t code = streamGetFileSize(pSnapFile->path, item.name, &item.size);
if (code) {
stError("failed to get file size");
return code;
}
p = taosArrayPush(pSnapFile->pFileList, &item);
if (p == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
// mainfest
item.name = pSnapFile->pMainfest;
item.type = ROCKSDB_MAINFEST_TYPE;
streamGetFileSize(pSnapFile->path, item.name, &item.size);
taosArrayPush(pSnapFile->pFileList, &item);
code = streamGetFileSize(pSnapFile->path, item.name, &item.size);
if (code) {
return code;
}
p = taosArrayPush(pSnapFile->pFileList, &item);
if (p == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
// options
item.name = pSnapFile->pOptions;
item.type = ROCKSDB_OPTIONS_TYPE;
streamGetFileSize(pSnapFile->path, item.name, &item.size);
taosArrayPush(pSnapFile->pFileList, &item);
code = streamGetFileSize(pSnapFile->path, item.name, &item.size);
if (code) {
return code;
}
p = taosArrayPush(pSnapFile->pFileList, &item);
if (p == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
// sst
for (int32_t i = 0; i < taosArrayGetSize(pSnapFile->pSst); i++) {
char* sst = taosArrayGetP(pSnapFile->pSst, i);
item.name = sst;
item.type = ROCKSDB_SST_TYPE;
streamGetFileSize(pSnapFile->path, item.name, &item.size);
taosArrayPush(pSnapFile->pFileList, &item);
code = streamGetFileSize(pSnapFile->path, item.name, &item.size);
if (code) {
return code;
}
p = taosArrayPush(pSnapFile->pFileList, &item);
if (p == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}
// meta
item.name = pSnapFile->pCheckpointMeta;
item.type = ROCKSDB_CHECKPOINT_META_TYPE;
if (streamGetFileSize(pSnapFile->path, item.name, &item.size) == 0) {
taosArrayPush(pSnapFile->pFileList, &item);
p = taosArrayPush(pSnapFile->pFileList, &item);
if (p == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}
item.name = pSnapFile->pCheckpointSelfcheck;
item.type = ROCKSDB_CHECKPOINT_SELFCHECK_TYPE;
if (streamGetFileSize(pSnapFile->path, item.name, &item.size) == 0) {
taosArrayPush(pSnapFile->pFileList, &item);
p = taosArrayPush(pSnapFile->pFileList, &item);
if (p == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}
return 0;
}
int32_t snapFileReadMeta(SBackendSnapFile2* pSnapFile) {
int32_t code = 0;
TdDirPtr pDir = taosOpenDir(pSnapFile->path);
@ -288,12 +328,18 @@ int32_t snapFileReadMeta(SBackendSnapFile2* pSnapFile) {
code = TSDB_CODE_OUT_OF_MEMORY;
break;
}
taosArrayPush(pSnapFile->pSst, &sst);
void* p = taosArrayPush(pSnapFile->pSst, &sst);
if (p == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
break;
}
}
}
taosCloseDir(&pDir);
return code;
return taosCloseDir(&pDir);
}
int32_t streamBackendSnapInitFile(char* metaPath, SStreamTaskSnap* pSnap, SBackendSnapFile2* pSnapFile) {
int32_t code = 0;
int32_t nBytes = 0;
@ -359,13 +405,16 @@ void snapFileDestroy(SBackendSnapFile2* pSnap) {
}
taosArrayDestroy(pSnap->pFileList);
taosArrayDestroy(pSnap->pSst);
taosCloseFile(&pSnap->fd);
return;
int32_t code = taosCloseFile(&pSnap->fd);
if (code) {
stError("failed to close snapshot fd");
}
}
int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, void* pMeta) {
// impl later
int32_t code = 0;
SArray* pDbSnapSet = NULL;
SArray* pSnapInfoSet = taosArrayInit(4, sizeof(SStreamTaskSnap));
if (pSnapInfoSet == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
@ -374,15 +423,13 @@ int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, void* pMeta
code = streamCreateTaskDbSnapInfo(pMeta, path, pSnapInfoSet);
if (code != 0) {
stError("failed to do task db snap info, reason:%s", tstrerror(code));
taosArrayDestroy(pSnapInfoSet);
return code;
goto _err;
}
SArray* pDbSnapSet = taosArrayInit(8, sizeof(SBackendSnapFile2));
pDbSnapSet = taosArrayInit(8, sizeof(SBackendSnapFile2));
if (pDbSnapSet == NULL) {
taosArrayDestroy(pSnapInfoSet);
code = TSDB_CODE_OUT_OF_MEMORY;
return code;
goto _err;
}
for (int32_t i = 0; i < taosArrayGetSize(pSnapInfoSet); i++) {
@ -391,16 +438,24 @@ int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, void* pMeta
SBackendSnapFile2 snapFile = {0};
code = streamBackendSnapInitFile(path, pSnap, &snapFile);
ASSERT(code == 0);
taosArrayPush(pDbSnapSet, &snapFile);
void* p = taosArrayPush(pDbSnapSet, &snapFile);
if (p == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
}
pHandle->pDbSnapSet = pDbSnapSet;
pHandle->pSnapInfoSet = pSnapInfoSet;
pHandle->currIdx = 0;
pHandle->pMeta = pMeta;
return 0;
return code;
_err:
taosArrayDestroy(pSnapInfoSet);
taosArrayDestroy(pDbSnapSet);
streamSnapHandleDestroy(pHandle);
return code;
}
@ -414,7 +469,8 @@ void streamSnapHandleDestroy(SStreamSnapHandle* handle) {
}
taosArrayDestroy(handle->pDbSnapSet);
}
streamDestroyTaskDbSnapInfo(handle->pMeta, handle->pSnapInfoSet);
(void) streamDestroyTaskDbSnapInfo(handle->pMeta, handle->pSnapInfoSet);
if (handle->pSnapInfoSet) {
for (int32_t i = 0; i < taosArrayGetSize(handle->pSnapInfoSet); i++) {
SStreamTaskSnap* pSnap = taosArrayGet(handle->pSnapInfoSet, i);
@ -422,8 +478,8 @@ void streamSnapHandleDestroy(SStreamSnapHandle* handle) {
}
taosArrayDestroy(handle->pSnapInfoSet);
}
taosMemoryFree(handle->metaPath);
return;
}
int32_t streamSnapReaderOpen(void* pMeta, int64_t sver, int64_t chkpId, char* path, SStreamSnapReader** ppReader) {
@ -506,14 +562,22 @@ _NEXT:
item->name, (int64_t)pSnapFile->offset, item->size, pSnapFile->currFileIdx);
pSnapFile->offset += nread;
if (pSnapFile->offset >= item->size || nread < kBlockSize) {
taosCloseFile(&pSnapFile->fd);
code = taosCloseFile(&pSnapFile->fd);
if (code) {
stError("failed to close snapshot fd");
}
pSnapFile->offset = 0;
pSnapFile->currFileIdx += 1;
}
} else {
stDebug("%s no data read, close file no.%d, move to next file, open and read", STREAM_STATE_TRANSFER,
pSnapFile->currFileIdx);
taosCloseFile(&pSnapFile->fd);
code = taosCloseFile(&pSnapFile->fd);
if (code) {
stError("failed to close snapshot fd");
}
pSnapFile->offset = 0;
pSnapFile->currFileIdx += 1;
@ -577,14 +641,22 @@ int32_t streamSnapWriterOpen(void* pMeta, int64_t sver, int64_t ever, char* path
pHandle->pDbSnapSet = taosArrayInit(8, sizeof(SBackendSnapFile2));
if (pHandle->pDbSnapSet == NULL) {
streamSnapWriterClose(pWriter, 0);
int32_t c = streamSnapWriterClose(pWriter, 0); // not override the error code, and igore this error code
if (c) {
stError("failed close snaphost writer");
}
code = TSDB_CODE_OUT_OF_MEMORY;
return code;
}
SBackendSnapFile2 snapFile = {0};
if (taosArrayPush(pHandle->pDbSnapSet, &snapFile) == NULL) {
streamSnapWriterClose(pWriter, 0);
int32_t c = streamSnapWriterClose(pWriter, 0);
if (c) {
stError("failed close snaphost writer");
}
code = TSDB_CODE_OUT_OF_MEMORY;
return code;
}
@ -614,46 +686,62 @@ int32_t streamSnapWriteImpl(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t
pHdr->name, tstrerror(code));
}
}
if (strlen(pHdr->name) == strlen(pItem->name) && strcmp(pHdr->name, pItem->name) == 0) {
int64_t bytes = taosPWriteFile(pSnapFile->fd, pHdr->data, pHdr->size, pSnapFile->offset);
if (bytes != pHdr->size) {
code = TAOS_SYSTEM_ERROR(errno);
stError("%s failed to write snap, file name:%s, reason:%s", STREAM_STATE_TRANSFER, pHdr->name, tstrerror(code));
return code;
goto _err;
} else {
stInfo("succ to write data %s", pItem->name);
}
pSnapFile->offset += bytes;
} else {
taosCloseFile(&pSnapFile->fd);
code = taosCloseFile(&pSnapFile->fd);
if (code) {
stError("failed to close snapshot fd");
}
pSnapFile->offset = 0;
pSnapFile->currFileIdx += 1;
SBackendFileItem item = {0};
item.name = taosStrdup(pHdr->name);
item.type = pHdr->type;
if (item.name == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
taosArrayPush(pSnapFile->pFileList, &item);
void* p = taosArrayPush(pSnapFile->pFileList, &item);
if (p == NULL) { // can NOT goto _err here.
return TSDB_CODE_OUT_OF_MEMORY;
}
SBackendFileItem* pItem = taosArrayGet(pSnapFile->pFileList, pSnapFile->currFileIdx);
pSnapFile->fd = streamOpenFile(pSnapFile->path, pItem->name, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
SBackendFileItem* pItem2 = taosArrayGet(pSnapFile->pFileList, pSnapFile->currFileIdx);
pSnapFile->fd = streamOpenFile(pSnapFile->path, pItem2->name, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
if (pSnapFile->fd == NULL) {
code = TAOS_SYSTEM_ERROR(errno);
stError("%s failed to open file name:%s%s%s, reason:%s", STREAM_STATE_TRANSFER, pSnapFile->path, TD_DIRSEP,
pHdr->name, tstrerror(code));
return code;
}
// open fd again, let's close fd during handle errors.
if (taosPWriteFile(pSnapFile->fd, pHdr->data, pHdr->size, pSnapFile->offset) != pHdr->size) {
code = TAOS_SYSTEM_ERROR(errno);
stError("%s failed to write snap, file name:%s, reason:%s", STREAM_STATE_TRANSFER, pHdr->name, tstrerror(code));
return code;
goto _err;
}
stInfo("succ to write data %s", pItem->name);
stInfo("succ to write data %s", pItem2->name);
pSnapFile->offset += pHdr->size;
}
code = 0;
_EXIT:
return TSDB_CODE_SUCCESS;
_err:
(void) taosCloseFile(&pSnapFile->fd);
return code;
}
@ -688,7 +776,10 @@ int32_t streamSnapWrite(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t nDa
item.name = taosStrdup((char*)ROCKSDB_CURRENT);
item.type = ROCKSDB_CURRENT_TYPE;
taosArrayPush(pDbSnapFile->pFileList, &item);
void* p = taosArrayPush(pDbSnapFile->pFileList, &item);
if (p == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pDbSnapFile->inited = 1;
return streamSnapWriteImpl(pWriter, pData, nData, pDbSnapFile);
@ -697,9 +788,12 @@ int32_t streamSnapWrite(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t nDa
return streamSnapWriteImpl(pWriter, pData, nData, pDbSnapFile);
} else {
SBackendSnapFile2 snapFile = {0};
taosArrayPush(pHandle->pDbSnapSet, &snapFile);
pHandle->currIdx += 1;
void* p = taosArrayPush(pHandle->pDbSnapSet, &snapFile);
if (p == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pHandle->currIdx += 1;
return streamSnapWrite(pWriter, pData, nData);
}
}

View File

@ -29,19 +29,19 @@ typedef struct SLaunchHTaskInfo {
STaskId hTaskId;
} SLaunchHTaskInfo;
static int32_t streamSetParamForScanHistory(SStreamTask* pTask);
static void streamTaskSetRangeStreamCalc(SStreamTask* pTask);
static int32_t initScanHistoryReq(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8_t igUntreated);
static SLaunchHTaskInfo* createHTaskLaunchInfo(SStreamMeta* pMeta, STaskId* pTaskId, int64_t hStreamId,
int32_t hTaskId);
static void tryLaunchHistoryTask(void* param, void* tmrId);
static void doExecScanhistoryInFuture(void* param, void* tmrId);
static int32_t doStartScanHistoryTask(SStreamTask* pTask);
static int32_t streamTaskStartScanHistory(SStreamTask* pTask);
static void checkFillhistoryTaskStatus(SStreamTask* pTask, SStreamTask* pHTask);
static int32_t launchNotBuiltFillHistoryTask(SStreamTask* pTask);
static void doRetryLaunchFillHistoryTask(SStreamTask* pTask, SLaunchHTaskInfo* pInfo, int64_t now);
static void notRetryLaunchFillHistoryTask(SStreamTask* pTask, SLaunchHTaskInfo* pInfo, int64_t now);
static int32_t streamSetParamForScanHistory(SStreamTask* pTask);
static int32_t streamTaskSetRangeStreamCalc(SStreamTask* pTask);
static void initScanHistoryReq(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8_t igUntreated);
static int32_t createHTaskLaunchInfo(SStreamMeta* pMeta, STaskId* pTaskId, int64_t hStreamId, int32_t hTaskId,
SLaunchHTaskInfo** pInfo);
static void tryLaunchHistoryTask(void* param, void* tmrId);
static void doExecScanhistoryInFuture(void* param, void* tmrId);
static int32_t doStartScanHistoryTask(SStreamTask* pTask);
static int32_t streamTaskStartScanHistory(SStreamTask* pTask);
static void checkFillhistoryTaskStatus(SStreamTask* pTask, SStreamTask* pHTask);
static int32_t launchNotBuiltFillHistoryTask(SStreamTask* pTask);
static void doRetryLaunchFillHistoryTask(SStreamTask* pTask, SLaunchHTaskInfo* pInfo, int64_t now);
static void notRetryLaunchFillHistoryTask(SStreamTask* pTask, SLaunchHTaskInfo* pInfo, int64_t now);
static int32_t streamTaskSetReady(SStreamTask* pTask) {
int32_t numOfDowns = streamTaskGetNumOfDownstream(pTask);
@ -65,22 +65,19 @@ static int32_t streamTaskSetReady(SStreamTask* pTask) {
int32_t streamStartScanHistoryAsync(SStreamTask* pTask, int8_t igUntreated) {
SStreamScanHistoryReq req;
int32_t code = 0;
initScanHistoryReq(pTask, &req, igUntreated);
int32_t len = sizeof(SStreamScanHistoryReq);
void* serializedReq = rpcMallocCont(len);
if (serializedReq == NULL) {
return -1;
return TSDB_CODE_OUT_OF_MEMORY;
}
memcpy(serializedReq, &req, len);
SRpcMsg rpcMsg = {.contLen = len, .pCont = serializedReq, .msgType = TDMT_VND_STREAM_SCAN_HISTORY};
if (tmsgPutToQueue(pTask->pMsgCb, STREAM_QUEUE, &rpcMsg) < 0) {
/*ASSERT(0);*/
}
return 0;
return tmsgPutToQueue(pTask->pMsgCb, STREAM_QUEUE, &rpcMsg);
}
int32_t streamExecScanHistoryInFuture(SStreamTask* pTask, int32_t idleDuration) {
@ -109,8 +106,8 @@ int32_t streamExecScanHistoryInFuture(SStreamTask* pTask, int32_t idleDuration)
pTask->schedHistoryInfo.pTimer =
taosTmrStart(doExecScanhistoryInFuture, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamTimer);
} else {
taosTmrReset(doExecScanhistoryInFuture, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamTimer,
&pTask->schedHistoryInfo.pTimer);
streamTmrReset(doExecScanhistoryInFuture, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamTimer,
&pTask->schedHistoryInfo.pTimer, pTask->pMeta->vgId, " start-history-task-tmr");
}
return TSDB_CODE_SUCCESS;
@ -135,9 +132,19 @@ int32_t streamTaskStartScanHistory(SStreamTask* pTask) {
int32_t streamTaskOnNormalTaskReady(SStreamTask* pTask) {
const char* id = pTask->id.idStr;
int32_t code = 0;
streamTaskSetReady(pTask);
streamTaskSetRangeStreamCalc(pTask);
code = streamTaskSetReady(pTask);
if (code) {
stError("s-task:%s failed to set task status ready", id);
return code;
}
code = streamTaskSetRangeStreamCalc(pTask);
if (code) {
stError("s-task:%s failed to set the time range for stream task", id);
return code;
}
SStreamTaskState p = streamTaskGetStatus(pTask);
ASSERT(p.state == TASK_STATUS__READY);
@ -155,19 +162,23 @@ int32_t streamTaskOnNormalTaskReady(SStreamTask* pTask) {
stDebug("s-task:%s level:%d status:%s sched-status:%d", id, pTask->info.taskLevel, p.name, schedStatus);
}
return TSDB_CODE_SUCCESS;
return code;
}
int32_t streamTaskOnScanHistoryTaskReady(SStreamTask* pTask) {
// set the state to be ready
streamTaskSetReady(pTask);
streamTaskSetRangeStreamCalc(pTask);
int32_t code = streamTaskSetReady(pTask);
if (code == 0) {
code = streamTaskSetRangeStreamCalc(pTask);
}
SStreamTaskState p = streamTaskGetStatus(pTask);
ASSERT((p.state == TASK_STATUS__SCAN_HISTORY) && (pTask->info.fillHistory == 1));
if (code == 0) {
SStreamTaskState p = streamTaskGetStatus(pTask);
ASSERT((p.state == TASK_STATUS__SCAN_HISTORY) && (pTask->info.fillHistory == 1));
stDebug("s-task:%s fill-history task enters into scan-history data stage, status:%s", pTask->id.idStr, p.name);
streamTaskStartScanHistory(pTask);
stDebug("s-task:%s fill-history task enters into scan-history data stage, status:%s", pTask->id.idStr, p.name);
code = streamTaskStartScanHistory(pTask);
}
// NOTE: there will be an deadlock if launch fill history here.
// start the related fill-history task, when current task is ready
@ -175,7 +186,7 @@ int32_t streamTaskOnScanHistoryTaskReady(SStreamTask* pTask) {
// streamLaunchFillHistoryTask(pTask);
// }
return TSDB_CODE_SUCCESS;
return code;
}
// common
@ -212,8 +223,7 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) {
stDebug("s-task:%s not launch related fill-history task:0x%" PRIx64 "-0x%x, status:%s", idStr, hStreamId, hTaskId,
pStatus.name);
(void) streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->checkTs, pExecInfo->readyTs, false);
return -1; // todo set the correct error code
return streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->checkTs, pExecInfo->readyTs, false);
}
stDebug("s-task:%s start to launch related fill-history task:0x%" PRIx64 "-0x%x", idStr, hStreamId, hTaskId);
@ -257,12 +267,11 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) {
}
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
int32_t initScanHistoryReq(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8_t igUntreated) {
void initScanHistoryReq(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8_t igUntreated) {
pReq->msgHead.vgId = pTask->info.nodeId;
pReq->streamId = pTask->id.streamId;
pReq->taskId = pTask->id.taskId;
pReq->igUntreated = igUntreated;
return 0;
}
void checkFillhistoryTaskStatus(SStreamTask* pTask, SStreamTask* pHTask) {
@ -281,7 +290,10 @@ void checkFillhistoryTaskStatus(SStreamTask* pTask, SStreamTask* pHTask) {
}
// check if downstream tasks have been ready
streamTaskHandleEvent(pHTask->status.pSM, TASK_EVENT_INIT_SCANHIST);
int32_t code = streamTaskHandleEvent(pHTask->status.pSM, TASK_EVENT_INIT_SCANHIST);
if (code) {
stError("s-task:%s handle event init_scanhist failed", pTask->id.idStr);
}
}
void notRetryLaunchFillHistoryTask(SStreamTask* pTask, SLaunchHTaskInfo* pInfo, int64_t now) {
@ -316,7 +328,8 @@ void doRetryLaunchFillHistoryTask(SStreamTask* pTask, SLaunchHTaskInfo* pInfo, i
stDebug("s-task:%s status:%s failed to launch fill-history task:0x%x, retry launch:%dms, retryCount:%d",
pTask->id.idStr, p, hTaskId, pHTaskInfo->waitInterval, pHTaskInfo->retryTimes);
taosTmrReset(tryLaunchHistoryTask, LAUNCH_HTASK_INTERVAL, pInfo, streamTimer, &pHTaskInfo->pTimer);
streamTmrReset(tryLaunchHistoryTask, LAUNCH_HTASK_INTERVAL, pInfo, streamTimer, &pHTaskInfo->pTimer,
pTask->pMeta->vgId, " start-history-task-tmr");
}
}
@ -367,7 +380,8 @@ void tryLaunchHistoryTask(void* param, void* tmrId) {
pHTaskInfo->tickCount -= 1;
if (pHTaskInfo->tickCount > 0) {
taosTmrReset(tryLaunchHistoryTask, LAUNCH_HTASK_INTERVAL, pInfo, streamTimer, &pHTaskInfo->pTimer);
streamTmrReset(tryLaunchHistoryTask, LAUNCH_HTASK_INTERVAL, pInfo, streamTimer, &pHTaskInfo->pTimer,
pTask->pMeta->vgId, " start-history-task-tmr");
streamMetaReleaseTask(pMeta, pTask);
return;
}
@ -417,21 +431,21 @@ void tryLaunchHistoryTask(void* param, void* tmrId) {
taosMemoryFree(pInfo);
}
SLaunchHTaskInfo* createHTaskLaunchInfo(SStreamMeta* pMeta, STaskId* pTaskId, int64_t hStreamId, int32_t hTaskId) {
SLaunchHTaskInfo* pInfo = taosMemoryCalloc(1, sizeof(SLaunchHTaskInfo));
if (pInfo == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
int32_t createHTaskLaunchInfo(SStreamMeta* pMeta, STaskId* pTaskId, int64_t hStreamId, int32_t hTaskId,
SLaunchHTaskInfo** pInfo) {
*pInfo = taosMemoryCalloc(1, sizeof(SLaunchHTaskInfo));
if ((*pInfo) == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pInfo->id.streamId = pTaskId->streamId;
pInfo->id.taskId = pTaskId->taskId;
(*pInfo)->id.streamId = pTaskId->streamId;
(*pInfo)->id.taskId = pTaskId->taskId;
pInfo->hTaskId.streamId = hStreamId;
pInfo->hTaskId.taskId = hTaskId;
(*pInfo)->hTaskId.streamId = hStreamId;
(*pInfo)->hTaskId.taskId = hTaskId;
pInfo->pMeta = pMeta;
return pInfo;
(*pInfo)->pMeta = pMeta;
return TSDB_CODE_SUCCESS;
}
int32_t launchNotBuiltFillHistoryTask(SStreamTask* pTask) {
@ -440,16 +454,18 @@ int32_t launchNotBuiltFillHistoryTask(SStreamTask* pTask) {
const char* idStr = pTask->id.idStr;
int64_t hStreamId = pTask->hTaskInfo.id.streamId;
int32_t hTaskId = pTask->hTaskInfo.id.taskId;
SLaunchHTaskInfo* pInfo = NULL;
ASSERT(hTaskId != 0);
stWarn("s-task:%s vgId:%d failed to launch history task:0x%x, since not built yet", idStr, pMeta->vgId, hTaskId);
STaskId id = streamTaskGetTaskId(pTask);
SLaunchHTaskInfo* pInfo = createHTaskLaunchInfo(pMeta, &id, hStreamId, hTaskId);
if (pInfo == NULL) {
STaskId id = streamTaskGetTaskId(pTask);
int32_t code = createHTaskLaunchInfo(pMeta, &id, hStreamId, hTaskId, &pInfo);
if (code) {
stError("s-task:%s failed to launch related fill-history task, since Out Of Memory", idStr);
(void) streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->checkTs, pExecInfo->readyTs, false);
return terrno;
(void)streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->checkTs, pExecInfo->readyTs, false);
return code;
}
// set the launch time info
@ -475,7 +491,8 @@ int32_t launchNotBuiltFillHistoryTask(SStreamTask* pTask) {
} else { // timer exists
ASSERT(pTask->status.timerActive >= 1);
stDebug("s-task:%s set timer active flag, task timer not null", idStr);
taosTmrReset(tryLaunchHistoryTask, WAIT_FOR_MINIMAL_INTERVAL, pInfo, streamTimer, &pTask->hTaskInfo.pTimer);
streamTmrReset(tryLaunchHistoryTask, WAIT_FOR_MINIMAL_INTERVAL, pInfo, streamTimer, &pTask->hTaskInfo.pTimer,
pTask->pMeta->vgId, " start-history-task-tmr");
}
return TSDB_CODE_SUCCESS;
@ -510,7 +527,7 @@ bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t nextProcessVe
}
}
void streamTaskSetRangeStreamCalc(SStreamTask* pTask) {
int32_t streamTaskSetRangeStreamCalc(SStreamTask* pTask) {
SDataRange* pRange = &pTask->dataRange;
if (!HAS_RELATED_FILLHISTORY_TASK(pTask)) {
@ -523,10 +540,12 @@ void streamTaskSetRangeStreamCalc(SStreamTask* pTask) {
"window:%" PRId64 "-%" PRId64 ", verRange:%" PRId64 "-%" PRId64,
pTask->id.idStr, pRange->window.skey, pRange->window.ekey, pRange->range.minVer, pRange->range.maxVer);
}
return TSDB_CODE_SUCCESS;
} else {
ASSERT(pTask->info.fillHistory == 0);
if (pTask->info.taskLevel >= TASK_LEVEL__AGG) {
return;
return TSDB_CODE_SUCCESS;
}
stDebug("s-task:%s level:%d related fill-history task exists, stream task timeWindow:%" PRId64 " - %" PRId64
@ -536,7 +555,7 @@ void streamTaskSetRangeStreamCalc(SStreamTask* pTask) {
SVersionRange verRange = pRange->range;
STimeWindow win = pRange->window;
streamSetParamForStreamScannerStep2(pTask, &verRange, &win);
return streamSetParamForStreamScannerStep2(pTask, &verRange, &win);
}
}
@ -554,7 +573,10 @@ void doExecScanhistoryInFuture(void* param, void* tmrId) {
}
if (pTask->schedHistoryInfo.numOfTicks <= 0) {
streamStartScanHistoryAsync(pTask, 0);
int32_t code = streamStartScanHistoryAsync(pTask, 0);
if (code) {
stError("s-task:%s async start history task failed", pTask->id.idStr);
}
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s fill-history:%d start scan-history data, out of tmr, ref:%d", pTask->id.idStr,
@ -563,18 +585,26 @@ void doExecScanhistoryInFuture(void* param, void* tmrId) {
// release the task.
streamMetaReleaseTask(pTask->pMeta, pTask);
} else {
taosTmrReset(doExecScanhistoryInFuture, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamTimer,
&pTask->schedHistoryInfo.pTimer);
streamTmrReset(doExecScanhistoryInFuture, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamTimer,
&pTask->schedHistoryInfo.pTimer, pTask->pMeta->vgId, " start-history-task-tmr");
}
}
int32_t doStartScanHistoryTask(SStreamTask* pTask) {
int32_t code = 0;
SVersionRange* pRange = &pTask->dataRange.range;
if (pTask->info.fillHistory) {
streamSetParamForScanHistory(pTask);
code = streamSetParamForScanHistory(pTask);
if (code) {
return code;
}
}
streamSetParamForStreamScannerStep1(pTask, pRange, &pTask->dataRange.window);
int32_t code = streamStartScanHistoryAsync(pTask, 0);
return code;
code = streamSetParamForStreamScannerStep1(pTask, pRange, &pTask->dataRange.window);
if (code) {
return code;
}
return streamStartScanHistoryAsync(pTask, 0);
}

View File

@ -29,20 +29,20 @@ static void streamTaskDestroyActiveChkptInfo(SActiveCheckpointInfo* pInfo);
static int32_t addToTaskset(SArray* pArray, SStreamTask* pTask) {
int32_t childId = taosArrayGetSize(pArray);
pTask->info.selfChildId = childId;
taosArrayPush(pArray, &pTask);
return 0;
void* p = taosArrayPush(pArray, &pTask);
return (p == NULL)? TSDB_CODE_OUT_OF_MEMORY:TSDB_CODE_SUCCESS;
}
static int32_t doUpdateTaskEpset(SStreamTask* pTask, int32_t nodeId, SEpSet* pEpSet, bool* pUpdated) {
char buf[512] = {0};
if (pTask->info.nodeId == nodeId) { // execution task should be moved away
bool isEqual = isEpsetEqual(&pTask->info.epSet, pEpSet);
epsetToStr(pEpSet, buf, tListLen(buf));
(void)epsetToStr(pEpSet, buf, tListLen(buf));
if (!isEqual) {
(*pUpdated) = true;
char tmp[512] = {0};
epsetToStr(&pTask->info.epSet, tmp, tListLen(tmp));
(void) epsetToStr(&pTask->info.epSet, tmp, tListLen(tmp)); // only for log file, ignore errors
epsetAssign(&pTask->info.epSet, pEpSet);
stDebug("s-task:0x%x (vgId:%d) self node epset is updated %s, old:%s", pTask->id.taskId, nodeId, buf, tmp);
@ -127,7 +127,10 @@ int32_t tNewStreamTask(int64_t streamId, int8_t taskLevel, SEpSet* pEpset, bool
pTask->outputq.status = TASK_OUTPUT_STATUS__NORMAL;
pTask->taskCheckInfo.pList = taosArrayInit(4, sizeof(SDownstreamStatusInfo));
taosThreadMutexInit(&pTask->taskCheckInfo.checkInfoLock, NULL);
code = taosThreadMutexInit(&pTask->taskCheckInfo.checkInfoLock, NULL);
if (code) {
return code;
}
if (fillHistory) {
ASSERT(hasFillhistory);
@ -135,7 +138,7 @@ int32_t tNewStreamTask(int64_t streamId, int8_t taskLevel, SEpSet* pEpset, bool
epsetAssign(&(pTask->info.mnodeEpset), pEpset);
addToTaskset(pTaskList, pTask);
code = addToTaskset(pTaskList, pTask);
*p = pTask;
return code;
@ -221,17 +224,17 @@ void tFreeStreamTask(SStreamTask* pTask) {
}
if (pTask->schedInfo.pDelayTimer != NULL) {
taosTmrStop(pTask->schedInfo.pDelayTimer);
(void) taosTmrStop(pTask->schedInfo.pDelayTimer);
pTask->schedInfo.pDelayTimer = NULL;
}
if (pTask->hTaskInfo.pTimer != NULL) {
/*bool ret = */ taosTmrStop(pTask->hTaskInfo.pTimer);
(void) taosTmrStop(pTask->hTaskInfo.pTimer);
pTask->hTaskInfo.pTimer = NULL;
}
if (pTask->msgInfo.pRetryTmr != NULL) {
/*bool ret = */ taosTmrStop(pTask->msgInfo.pRetryTmr);
(void) taosTmrStop(pTask->msgInfo.pRetryTmr);
pTask->msgInfo.pRetryTmr = NULL;
}
@ -394,10 +397,12 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i
return terrno;
}
taosThreadMutexInit(&pTask->msgInfo.lock, NULL);
code = taosThreadMutexInit(&pTask->msgInfo.lock, NULL);
if (code) {
return code;
}
TdThreadMutexAttr attr = {0};
code = taosThreadMutexAttrInit(&attr);
if (code != 0) {
stError("s-task:%s initElapsed mutex attr failed, code:%s", pTask->id.idStr, tstrerror(code));
@ -410,8 +415,16 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i
return code;
}
taosThreadMutexInit(&pTask->lock, &attr);
taosThreadMutexAttrDestroy(&attr);
code = taosThreadMutexInit(&pTask->lock, &attr);
if (code) {
return code;
}
code = taosThreadMutexAttrDestroy(&attr);
if (code) {
return code;
}
streamTaskOpenAllUpstreamInput(pTask);
STaskOutputInfo* pOutputInfo = &pTask->outputInfo;
@ -424,7 +437,11 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i
// 2MiB per second for sink task
// 50 times sink operator per second
streamTaskInitTokenBucket(pOutputInfo->pTokenBucket, 35, 35, tsSinkDataRate, pTask->id.idStr);
code = streamTaskInitTokenBucket(pOutputInfo->pTokenBucket, 35, 35, tsSinkDataRate, pTask->id.idStr);
if (code) {
return code;
}
pOutputInfo->pNodeEpsetUpdateList = taosArrayInit(4, sizeof(SDownstreamTaskEpset));
if (pOutputInfo->pNodeEpsetUpdateList == NULL) {
stError("s-task:%s failed to prepare downstreamUpdateList, code:%s", pTask->id.idStr,
@ -474,13 +491,13 @@ int32_t streamTaskSetUpstreamInfo(SStreamTask* pTask, const SStreamTask* pUpstre
pTask->upstreamInfo.pList = taosArrayInit(4, POINTER_BYTES);
}
taosArrayPush(pTask->upstreamInfo.pList, &pEpInfo);
return TSDB_CODE_SUCCESS;
void* p = taosArrayPush(pTask->upstreamInfo.pList, &pEpInfo);
return (p == NULL)? TSDB_CODE_OUT_OF_MEMORY:TSDB_CODE_SUCCESS;
}
void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet, bool* pUpdated) {
char buf[512] = {0};
epsetToStr(pEpSet, buf, tListLen(buf));
(void) epsetToStr(pEpSet, buf, tListLen(buf)); // ignore error since it is only for log file.
int32_t numOfUpstream = taosArrayGetSize(pTask->upstreamInfo.pList);
for (int32_t i = 0; i < numOfUpstream; ++i) {
@ -491,7 +508,7 @@ void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpS
*pUpdated = true;
char tmp[512] = {0};
epsetToStr(&pInfo->epSet, tmp, tListLen(tmp));
(void) epsetToStr(&pInfo->epSet, tmp, tListLen(tmp));
epsetAssign(&pInfo->epSet, pEpSet);
stDebug("s-task:0x%x update the upstreamInfo taskId:0x%x(nodeId:%d) newEpset:%s old:%s", pTask->id.taskId,
@ -526,7 +543,7 @@ void streamTaskSetFixedDownstreamInfo(SStreamTask* pTask, const SStreamTask* pDo
void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet, bool* pUpdated) {
char buf[512] = {0};
epsetToStr(pEpSet, buf, tListLen(buf));
(void) epsetToStr(pEpSet, buf, tListLen(buf)); // ignore the error since only for log files.
int32_t id = pTask->id.taskId;
int8_t type = pTask->outputInfo.type;
@ -542,7 +559,7 @@ void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SE
if (!isEqual) {
*pUpdated = true;
char tmp[512] = {0};
epsetToStr(&pVgInfo->epSet, tmp, tListLen(tmp));
(void) epsetToStr(&pVgInfo->epSet, tmp, tListLen(tmp));
epsetAssign(&pVgInfo->epSet, pEpSet);
stDebug("s-task:0x%x update dispatch info, task:0x%x(nodeId:%d) newEpset:%s old:%s", id, pVgInfo->taskId,
@ -562,7 +579,7 @@ void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SE
*pUpdated = true;
char tmp[512] = {0};
epsetToStr(&pDispatcher->epSet, tmp, tListLen(tmp));
(void) epsetToStr(&pDispatcher->epSet, tmp, tListLen(tmp));
epsetAssign(&pDispatcher->epSet, pEpSet);
stDebug("s-task:0x%x update dispatch info, task:0x%x(nodeId:%d) newEpset:%s old:%s", id, pDispatcher->taskId,
@ -580,8 +597,16 @@ int32_t streamTaskStop(SStreamTask* pTask) {
int64_t st = taosGetTimestampMs();
const char* id = pTask->id.idStr;
streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_STOP);
qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS);
int32_t code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_STOP);
if (code) {
stError("failed to handle STOP event, s-task:%s", id);
}
code = qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS);
if (code) {
stError("s-task:%s failed to kill task related query handle", id);
}
while (!streamTaskIsIdle(pTask)) {
stDebug("s-task:%s level:%d wait for task to be idle and then close, check again in 100ms", id,
pTask->info.taskLevel);
@ -590,7 +615,7 @@ int32_t streamTaskStop(SStreamTask* pTask) {
int64_t el = taosGetTimestampMs() - st;
stDebug("vgId:%d s-task:%s is closed in %" PRId64 " ms", vgId, id, el);
return 0;
return code;
}
bool streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList) {
@ -607,7 +632,10 @@ bool streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList) {
bool updated = false;
for (int32_t i = 0; i < taosArrayGetSize(pNodeList); ++i) {
SNodeUpdateInfo* pInfo = taosArrayGet(pNodeList, i);
doUpdateTaskEpset(pTask, pInfo->nodeId, &pInfo->newEp, &updated);
int32_t code = doUpdateTaskEpset(pTask, pInfo->nodeId, &pInfo->newEp, &updated);
if (code) {
stError("s-task:0x%x failed to update the task nodeEp epset, code:%s", pTask->id.taskId, tstrerror(code));
}
}
return updated;
@ -704,10 +732,11 @@ int8_t streamTaskSetSchedStatusInactive(SStreamTask* pTask) {
}
int32_t streamTaskClearHTaskAttr(SStreamTask* pTask, int32_t resetRelHalt) {
int32_t code = 0;
SStreamMeta* pMeta = pTask->pMeta;
STaskId sTaskId = {.streamId = pTask->streamTaskId.streamId, .taskId = pTask->streamTaskId.taskId};
if (pTask->info.fillHistory == 0) {
return TSDB_CODE_SUCCESS;
return code;
}
SStreamTask** ppStreamTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &sTaskId, sizeof(sTaskId));
@ -725,11 +754,11 @@ int32_t streamTaskClearHTaskAttr(SStreamTask* pTask, int32_t resetRelHalt) {
(*ppStreamTask)->status.taskStatus = TASK_STATUS__READY;
}
streamMetaSaveTask(pMeta, *ppStreamTask);
code = streamMetaSaveTask(pMeta, *ppStreamTask);
streamMutexUnlock(&(*ppStreamTask)->lock);
}
return TSDB_CODE_SUCCESS;
return code;
}
int32_t streamBuildAndSendDropTaskMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskId* pTaskId, int64_t resetRelHalt) {
@ -797,8 +826,7 @@ int32_t streamSendChkptReportMsg(SStreamTask* pTask, SCheckpointInfo* pCheckpoin
initRpcMsg(&msg, TDMT_MND_STREAM_CHKPT_REPORT, buf, tlen);
stDebug("s-task:%s vgId:%d build and send task checkpoint-report to mnode", id, vgId);
tmsgSendReq(&pTask->info.mnodeEpset, &msg);
return 0;
return tmsgSendReq(&pTask->info.mnodeEpset, &msg);
}
STaskId streamTaskGetTaskId(const SStreamTask* pTask) {
@ -880,6 +908,7 @@ STaskStatusEntry streamTaskGetStatusEntry(SStreamTask* pTask) {
static int32_t taskPauseCallback(SStreamTask* pTask, void* param) {
SStreamMeta* pMeta = pTask->pMeta;
int32_t code = 0;
int32_t num = atomic_add_fetch_32(&pMeta->numOfPausedTasks, 1);
stInfo("vgId:%d s-task:%s pause stream task. paused task num:%d", pMeta->vgId, pTask->id.idStr, num);
@ -887,15 +916,15 @@ static int32_t taskPauseCallback(SStreamTask* pTask, void* param) {
// in case of fill-history task, stop the tsdb file scan operation.
if (pTask->info.fillHistory == 1) {
void* pExecutor = pTask->exec.pExecutor;
qKillTask(pExecutor, TSDB_CODE_SUCCESS);
code = qKillTask(pExecutor, TSDB_CODE_SUCCESS);
}
stDebug("vgId:%d s-task:%s set pause flag and pause task", pMeta->vgId, pTask->id.idStr);
return TSDB_CODE_SUCCESS;
return code;
}
void streamTaskPause(SStreamTask* pTask) {
streamTaskHandleEventAsync(pTask->status.pSM, TASK_EVENT_PAUSE, taskPauseCallback, NULL);
(void) streamTaskHandleEventAsync(pTask->status.pSM, TASK_EVENT_PAUSE, taskPauseCallback, NULL);
}
void streamTaskResume(SStreamTask* pTask) {
@ -949,8 +978,7 @@ int32_t streamTaskSendCheckpointReq(SStreamTask* pTask) {
initRpcMsg(&msg, TDMT_MND_STREAM_REQ_CHKPT, buf, tlen);
stDebug("s-task:%s vgId:%d build and send task checkpoint req", id, vgId);
tmsgSendReq(&pTask->info.mnodeEpset, &msg);
return 0;
return tmsgSendReq(&pTask->info.mnodeEpset, &msg);
}
void streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t taskId, SStreamUpstreamEpInfo** pEpInfo) {
@ -1044,7 +1072,7 @@ int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq) {
void streamTaskSetRemoveBackendFiles(SStreamTask* pTask) { pTask->status.removeBackendFiles = true; }
int32_t streamTaskGetActiveCheckpointInfo(const SStreamTask* pTask, int32_t* pTransId, int64_t* pCheckpointId) {
void streamTaskGetActiveCheckpointInfo(const SStreamTask* pTask, int32_t* pTransId, int64_t* pCheckpointId) {
if (pTransId != NULL) {
*pTransId = pTask->chkInfo.pActiveInfo->transId;
}
@ -1052,8 +1080,6 @@ int32_t streamTaskGetActiveCheckpointInfo(const SStreamTask* pTask, int32_t* pTr
if (pCheckpointId != NULL) {
*pCheckpointId = pTask->chkInfo.pActiveInfo->activeId;
}
return TSDB_CODE_SUCCESS;
}
int32_t streamTaskSetActiveCheckpointInfo(SStreamTask* pTask, int64_t activeCheckpointId) {
@ -1084,7 +1110,7 @@ int32_t streamTaskCreateActiveChkptInfo(SActiveCheckpointInfo** pRes) {
pInfo->pCheckpointReadyRecvList = taosArrayInit(4, sizeof(STaskDownstreamReadyInfo));
*pRes = pInfo;
return TSDB_CODE_SUCCESS;
return code;
}
void streamTaskDestroyActiveChkptInfo(SActiveCheckpointInfo* pInfo) {
@ -1101,12 +1127,12 @@ void streamTaskDestroyActiveChkptInfo(SActiveCheckpointInfo* pInfo) {
pInfo->pCheckpointReadyRecvList = NULL;
if (pInfo->pChkptTriggerTmr != NULL) {
taosTmrStop(pInfo->pChkptTriggerTmr);
(void) taosTmrStop(pInfo->pChkptTriggerTmr);
pInfo->pChkptTriggerTmr = NULL;
}
if (pInfo->pSendReadyMsgTmr != NULL) {
taosTmrStop(pInfo->pSendReadyMsgTmr);
(void) taosTmrStop(pInfo->pSendReadyMsgTmr);
pInfo->pSendReadyMsgTmr = NULL;
}

View File

@ -35,8 +35,9 @@ void streamTimerCleanUp() {
streamTimer = NULL;
}
tmr_h streamTimerGetInstance() {
return streamTimer;
int32_t streamTimerGetInstance(tmr_h* pTmr) {
*pTmr = streamTimer;
return TSDB_CODE_SUCCESS;
}
void streamTmrReset(TAOS_TMR_CALLBACK fp, int32_t mseconds, void* param, void* handle, tmr_h* pTmrId, int32_t vgId,

View File

@ -36,7 +36,6 @@ static int64_t adjustExpEntries(int64_t entries) { return TMIN(DEFAULT_EXPECTED_
int compareKeyTs(void* pTs1, void* pTs2, void* pPkVal, __compar_fn_t cmpPkFn) {
return compareInt64Val(pTs1, pTs2);
;
}
int compareKeyTsAndPk(void* pValue1, void* pTs, void* pPkVal, __compar_fn_t cmpPkFn) {