Merge pull request #26750 from taosdata/fix/refactorTqBackend

Fix/refactorTqBackend
This commit is contained in:
Hongze Cheng 2024-07-29 14:35:31 +08:00 committed by GitHub
commit b0d28315ad
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 140 additions and 135 deletions

View File

@ -39,13 +39,15 @@ int32_t streamTaskSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamTa
// alloc // alloc
pReader = (SStreamTaskReader*)taosMemoryCalloc(1, sizeof(SStreamTaskReader)); pReader = (SStreamTaskReader*)taosMemoryCalloc(1, sizeof(SStreamTaskReader));
if (pReader == NULL) { if (pReader == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; TAOS_CHECK_RETURN(TSDB_CODE_OUT_OF_MEMORY);
goto _err;
} }
pReader->pTq = pTq; pReader->pTq = pTq;
pReader->sver = sver; pReader->sver = sver;
pReader->ever = ever; pReader->ever = ever;
pReader->tdbTbList = taosArrayInit(4, sizeof(STablePair)); pReader->tdbTbList = taosArrayInit(4, sizeof(STablePair));
if (pReader->tdbTbList == NULL) {
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _err);
}
STablePair pair1 = {.tbl = pTq->pStreamMeta->pTaskDb, .type = SNAP_DATA_STREAM_TASK}; STablePair pair1 = {.tbl = pTq->pStreamMeta->pTaskDb, .type = SNAP_DATA_STREAM_TASK};
taosArrayPush(pReader->tdbTbList, &pair1); taosArrayPush(pReader->tdbTbList, &pair1);
@ -60,16 +62,14 @@ int32_t streamTaskSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamTa
if (code) { if (code) {
tqInfo("vgId:%d, vnode stream-task snapshot reader failed to open, reason: %s", TD_VID(pTq->pVnode), tqInfo("vgId:%d, vnode stream-task snapshot reader failed to open, reason: %s", TD_VID(pTq->pVnode),
tstrerror(code)); tstrerror(code));
taosMemoryFree(pReader); TAOS_CHECK_GOTO(code, NULL, _err);
goto _err;
} }
code = tdbTbcMoveToFirst(pReader->pCur); code = tdbTbcMoveToFirst(pReader->pCur);
if (code) { if (code) {
tqInfo("vgId:%d, vnode stream-task snapshot reader failed to iterate, reason: %s", TD_VID(pTq->pVnode), tqInfo("vgId:%d, vnode stream-task snapshot reader failed to iterate, reason: %s", TD_VID(pTq->pVnode),
tstrerror(code)); tstrerror(code));
taosMemoryFree(pReader); TAOS_CHECK_GOTO(code, NULL, _err);
goto _err;
} }
tqDebug("vgId:%d, vnode stream-task snapshot reader opened", TD_VID(pTq->pVnode)); tqDebug("vgId:%d, vnode stream-task snapshot reader opened", TD_VID(pTq->pVnode));
@ -79,11 +79,14 @@ int32_t streamTaskSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamTa
_err: _err:
tqError("vgId:%d, vnode stream-task snapshot reader open failed since %s", TD_VID(pTq->pVnode), tstrerror(code)); tqError("vgId:%d, vnode stream-task snapshot reader open failed since %s", TD_VID(pTq->pVnode), tstrerror(code));
streamTaskSnapReaderClose(pReader);
*ppReader = NULL; *ppReader = NULL;
return code; return code;
} }
int32_t streamTaskSnapReaderClose(SStreamTaskReader* pReader) { int32_t streamTaskSnapReaderClose(SStreamTaskReader* pReader) {
if (pReader == NULL) return 0;
int32_t code = 0; int32_t code = 0;
tqInfo("vgId:%d, vnode stream-task snapshot reader closed", TD_VID(pReader->pTq->pVnode)); tqInfo("vgId:%d, vnode stream-task snapshot reader closed", TD_VID(pReader->pTq->pVnode));
taosArrayDestroy(pReader->tdbTbList); taosArrayDestroy(pReader->tdbTbList);
@ -116,6 +119,10 @@ NextTbl:
break; break;
} else { } else {
pVal = taosMemoryCalloc(1, tLen); pVal = taosMemoryCalloc(1, tLen);
if (pVal == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
memcpy(pVal, tVal, tLen); memcpy(pVal, tVal, tLen);
vLen = tLen; vLen = tLen;
} }
@ -174,8 +181,7 @@ int32_t streamTaskSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamTa
// alloc // alloc
pWriter = (SStreamTaskWriter*)taosMemoryCalloc(1, sizeof(*pWriter)); pWriter = (SStreamTaskWriter*)taosMemoryCalloc(1, sizeof(*pWriter));
if (pWriter == NULL) { if (pWriter == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; TAOS_CHECK_RETURN(TSDB_CODE_OUT_OF_MEMORY);
goto _err;
} }
pWriter->pTq = pTq; pWriter->pTq = pTq;
pWriter->sver = sver; pWriter->sver = sver;
@ -184,12 +190,6 @@ int32_t streamTaskSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamTa
*ppWriter = pWriter; *ppWriter = pWriter;
tqDebug("vgId:%d, vnode stream-task snapshot writer opened", TD_VID(pTq->pVnode)); tqDebug("vgId:%d, vnode stream-task snapshot writer opened", TD_VID(pTq->pVnode));
return code; return code;
_err:
tqError("vgId:%d, vnode stream-task snapshot writer failed to write since %s", TD_VID(pTq->pVnode), tstrerror(code));
*ppWriter = NULL;
return code;
return 0;
} }
int32_t streamTaskSnapWriterClose(SStreamTaskWriter* pWriter, int8_t rollback) { int32_t streamTaskSnapWriterClose(SStreamTaskWriter* pWriter, int8_t rollback) {
@ -207,8 +207,7 @@ int32_t streamTaskSnapWriterClose(SStreamTaskWriter* pWriter, int8_t rollback) {
if (code) goto _err; if (code) goto _err;
} }
if (tdbBegin(pTq->pStreamMeta->db, &pTq->pStreamMeta->txn, tdbDefaultMalloc, tdbDefaultFree, NULL, 0) < 0) { if ((code = tdbBegin(pTq->pStreamMeta->db, &pTq->pStreamMeta->txn, tdbDefaultMalloc, tdbDefaultFree, NULL, 0)) < 0) {
code = -1;
taosMemoryFree(pWriter); taosMemoryFree(pWriter);
goto _err; goto _err;
} }
@ -241,10 +240,11 @@ int32_t streamTaskSnapWrite(SStreamTaskWriter* pWriter, uint8_t* pData, uint32_t
int64_t key[2] = {taskId.streamId, taskId.taskId}; int64_t key[2] = {taskId.streamId, taskId.taskId};
streamMetaWLock(pTq->pStreamMeta); streamMetaWLock(pTq->pStreamMeta);
if (tdbTbUpsert(pTq->pStreamMeta->pTaskDb, key, sizeof(int64_t) << 1, (uint8_t*)pData + sizeof(SSnapDataHdr), if ((code =
nData - sizeof(SSnapDataHdr), pTq->pStreamMeta->txn) < 0) { tdbTbUpsert(pTq->pStreamMeta->pTaskDb, key, sizeof(int64_t) << 1, (uint8_t*)pData + sizeof(SSnapDataHdr),
nData - sizeof(SSnapDataHdr), pTq->pStreamMeta->txn)) < 0) {
streamMetaWUnLock(pTq->pStreamMeta); streamMetaWUnLock(pTq->pStreamMeta);
return -1; return code;
} }
streamMetaWUnLock(pTq->pStreamMeta); streamMetaWUnLock(pTq->pStreamMeta);
} else if (pHdr->type == SNAP_DATA_STREAM_TASK_CHECKPOINT) { } else if (pHdr->type == SNAP_DATA_STREAM_TASK_CHECKPOINT) {

View File

@ -143,9 +143,9 @@ SListNode* streamBackendAddCompare(void* backend, void* arg);
void streamBackendDelCompare(void* backend, void* arg); void streamBackendDelCompare(void* backend, void* arg);
int32_t streamStateCvtDataFormat(char* path, char* key, void* cfInst); int32_t streamStateCvtDataFormat(char* path, char* key, void* cfInst);
STaskDbWrapper* taskDbOpen(const char* path, const char* key, int64_t chkptId, int64_t* processVer); int32_t taskDbOpen(const char* path, const char* key, int64_t chkptId, int64_t* processVer, STaskDbWrapper** ppTaskDb);
void taskDbDestroy(void* pBackend, bool flush); void taskDbDestroy(void* pBackend, bool flush);
void taskDbDestroy2(void* pBackend); void taskDbDestroy2(void* pBackend);
void taskDbUpdateChkpId(void* pTaskDb, int64_t chkpId); void taskDbUpdateChkpId(void* pTaskDb, int64_t chkpId);
@ -252,7 +252,7 @@ int32_t taskDbDestroySnap(void* arg, SArray* pSnapInfo);
int32_t taskDbDoCheckpoint(void* arg, int64_t chkpId, int64_t processId); int32_t taskDbDoCheckpoint(void* arg, int64_t chkpId, int64_t processId);
SBkdMgt* bkdMgtCreate(char* path); int32_t bkdMgtCreate(char* path, SBkdMgt **bm);
int32_t bkdMgtAddChkp(SBkdMgt* bm, char* task, char* path); int32_t bkdMgtAddChkp(SBkdMgt* bm, char* task, char* path);
int32_t bkdMgtGetDelta(SBkdMgt* bm, char* taskId, int64_t chkpId, SArray* list, char* name); int32_t bkdMgtGetDelta(SBkdMgt* bm, char* taskId, int64_t chkpId, SArray* list, char* name);
int32_t bkdMgtDumpTo(SBkdMgt* bm, char* taskId, char* dname); int32_t bkdMgtDumpTo(SBkdMgt* bm, char* taskId, char* dname);

View File

@ -2525,35 +2525,35 @@ _EXIT:
return NULL; return NULL;
} }
STaskDbWrapper* taskDbOpen(const char* path, const char* key, int64_t chkptId, int64_t* processVer) { int32_t taskDbOpen(const char* path, const char* key, int64_t chkptId, int64_t* processVer, STaskDbWrapper** ppTaskDb) {
char* statePath = NULL; char* statePath = NULL;
char* dbPath = NULL; char* dbPath = NULL;
int code = 0; int code = 0;
terrno = 0;
if ((code = restoreCheckpointData(path, key, chkptId, &statePath, &dbPath, processVer)) < 0) { if ((code = restoreCheckpointData(path, key, chkptId, &statePath, &dbPath, processVer)) < 0) {
terrno = code;
stError("failed to restore checkpoint data, path:%s, key:%s, checkpointId: %" PRId64 "reason:%s", path, key, stError("failed to restore checkpoint data, path:%s, key:%s, checkpointId: %" PRId64 "reason:%s", path, key,
chkptId, tstrerror(terrno)); chkptId, tstrerror(code));
return NULL; return code;
} }
STaskDbWrapper* pTaskDb = taskDbOpenImpl(key, statePath, dbPath); STaskDbWrapper* pTaskDb = taskDbOpenImpl(key, statePath, dbPath);
if (pTaskDb != NULL) { if (pTaskDb != NULL) {
int64_t chkpId = -1, ver = -1; int64_t chkpId = -1, ver = -1;
if ((code = chkpLoadExtraInfo(dbPath, &chkpId, &ver) == 0)) { if ((code = chkpLoadExtraInfo(dbPath, &chkpId, &ver)) == 0) {
*processVer = ver; *processVer = ver;
} else { } else {
terrno = code;
stError("failed to load extra info, path:%s, key:%s, checkpointId: %" PRId64 "reason:%s", path, key, chkptId, stError("failed to load extra info, path:%s, key:%s, checkpointId: %" PRId64 "reason:%s", path, key, chkptId,
tstrerror(terrno)); tstrerror(code));
taskDbDestroy(pTaskDb, false); taskDbDestroy(pTaskDb, false);
return NULL; return code;
} }
} else {
code = TSDB_CODE_INVALID_PARA;
} }
taosMemoryFree(dbPath); taosMemoryFree(dbPath);
taosMemoryFree(statePath); taosMemoryFree(statePath);
return pTaskDb; *ppTaskDb = pTaskDb;
return code;
} }
void taskDbDestroy(void* pDb, bool flush) { void taskDbDestroy(void* pDb, bool flush) {
@ -2794,8 +2794,10 @@ int32_t streamStateCvtDataFormat(char* path, char* key, void* pCfInst) {
int32_t code = 0; int32_t code = 0;
int64_t processVer = -1; int64_t processVer = -1;
STaskDbWrapper* pTaskDb = taskDbOpen(path, key, 0, &processVer); STaskDbWrapper* pTaskDb = NULL;
RocksdbCfInst* pSrcBackend = pCfInst;
code = taskDbOpen(path, key, 0, &processVer, &pTaskDb);
RocksdbCfInst* pSrcBackend = pCfInst;
for (int i = 0; i < nCf; i++) { for (int i = 0; i < nCf; i++) {
rocksdb_column_family_handle_t* pSrcCf = pSrcBackend->pHandle[i]; rocksdb_column_family_handle_t* pSrcCf = pSrcBackend->pHandle[i];
@ -4626,10 +4628,11 @@ int32_t dbChkpGetDelta(SDbChkp* p, int64_t chkpId, SArray* list) {
void dbChkpDestroy(SDbChkp* pChkp); void dbChkpDestroy(SDbChkp* pChkp);
SDbChkp* dbChkpCreate(char* path, int64_t initChkpId) { int32_t dbChkpCreate(char* path, int64_t initChkpId, SDbChkp** ppChkp) {
int32_t code = 0;
SDbChkp* p = taosMemoryCalloc(1, sizeof(SDbChkp)); SDbChkp* p = taosMemoryCalloc(1, sizeof(SDbChkp));
if (p == NULL) { if (p == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _EXIT; goto _EXIT;
} }
@ -4637,41 +4640,41 @@ SDbChkp* dbChkpCreate(char* path, int64_t initChkpId) {
p->preCkptId = -1; p->preCkptId = -1;
p->pSST = taosArrayInit(64, sizeof(void*)); p->pSST = taosArrayInit(64, sizeof(void*));
if (p->pSST == NULL) { if (p->pSST == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
dbChkpDestroy(p); dbChkpDestroy(p);
return NULL; return code;
} }
p->path = path; p->path = path;
p->len = strlen(path) + 128; p->len = strlen(path) + 128;
p->buf = taosMemoryCalloc(1, p->len); p->buf = taosMemoryCalloc(1, p->len);
if (p->buf == NULL) { if (p->buf == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _EXIT; goto _EXIT;
} }
p->idx = 0; p->idx = 0;
p->pSstTbl[0] = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); p->pSstTbl[0] = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
if (p->pSstTbl[0] == NULL) { if (p->pSstTbl[0] == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _EXIT; goto _EXIT;
} }
p->pSstTbl[1] = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); p->pSstTbl[1] = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
if (p->pSstTbl[1] == NULL) { if (p->pSstTbl[1] == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _EXIT; goto _EXIT;
} }
p->pAdd = taosArrayInit(64, sizeof(void*)); p->pAdd = taosArrayInit(64, sizeof(void*));
if (p->pAdd == NULL) { if (p->pAdd == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _EXIT; goto _EXIT;
} }
p->pDel = taosArrayInit(64, sizeof(void*)); p->pDel = taosArrayInit(64, sizeof(void*));
if (p->pDel == NULL) { if (p->pDel == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _EXIT; goto _EXIT;
} }
@ -4679,15 +4682,15 @@ SDbChkp* dbChkpCreate(char* path, int64_t initChkpId) {
taosThreadRwlockInit(&p->rwLock, NULL); taosThreadRwlockInit(&p->rwLock, NULL);
SArray* list = NULL; SArray* list = NULL;
int32_t code = dbChkpGetDelta(p, initChkpId, list); code = dbChkpGetDelta(p, initChkpId, list);
if (code != 0) { if (code != 0) {
goto _EXIT; goto _EXIT;
} }
*ppChkp = p;
return p; return code;
_EXIT: _EXIT:
dbChkpDestroy(p); dbChkpDestroy(p);
return NULL; return code;
} }
void dbChkpDestroy(SDbChkp* pChkp) { void dbChkpDestroy(SDbChkp* pChkp) {
@ -4880,35 +4883,36 @@ _ERROR:
return code; return code;
} }
SBkdMgt* bkdMgtCreate(char* path) { int32_t bkdMgtCreate(char* path, SBkdMgt** mgt) {
terrno = 0; int32_t code = 0;
SBkdMgt* p = taosMemoryCalloc(1, sizeof(SBkdMgt)); SBkdMgt* p = taosMemoryCalloc(1, sizeof(SBkdMgt));
if (p == NULL) { if (p == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
return NULL; return code;
} }
p->pDbChkpTbl = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); p->pDbChkpTbl = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
if (p->pDbChkpTbl == NULL) { if (p->pDbChkpTbl == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
bkdMgtDestroy(p); bkdMgtDestroy(p);
return NULL; return code;
} }
p->path = taosStrdup(path); p->path = taosStrdup(path);
if (p->path == NULL) { if (p->path == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
bkdMgtDestroy(p); bkdMgtDestroy(p);
return NULL; return code;
} }
if (taosThreadRwlockInit(&p->rwLock, NULL) != 0) { if (taosThreadRwlockInit(&p->rwLock, NULL) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno); code = TAOS_SYSTEM_ERROR(errno);
bkdMgtDestroy(p); bkdMgtDestroy(p);
return NULL; return code;
} }
*mgt = p;
return p; return code;
} }
void bkdMgtDestroy(SBkdMgt* bm) { void bkdMgtDestroy(SBkdMgt* bm) {
@ -4949,11 +4953,11 @@ int32_t bkdMgtGetDelta(SBkdMgt* bm, char* taskId, int64_t chkpId, SArray* list,
return code; return code;
} }
SDbChkp* p = dbChkpCreate(path, chkpId); SDbChkp* p = NULL;
if (p == NULL) { code = dbChkpCreate(path, chkpId, &p);
if (code != 0) {
taosMemoryFree(path); taosMemoryFree(path);
taosThreadRwlockUnlock(&bm->rwLock); taosThreadRwlockUnlock(&bm->rwLock);
code = terrno;
return code; return code;
} }
@ -4986,8 +4990,9 @@ int32_t bkdMgtAddChkp(SBkdMgt* bm, char* task, char* path) {
taosThreadRwlockWrlock(&bm->rwLock); taosThreadRwlockWrlock(&bm->rwLock);
SDbChkp** pp = taosHashGet(bm->pDbChkpTbl, task, strlen(task)); SDbChkp** pp = taosHashGet(bm->pDbChkpTbl, task, strlen(task));
if (pp == NULL) { if (pp == NULL) {
SDbChkp* p = dbChkpCreate(path, 0); SDbChkp* p = NULL;
if (p != NULL) { code = dbChkpCreate(path, 0, &p);
if (code != 0) {
taosHashPut(bm->pDbChkpTbl, task, strlen(task), &p, sizeof(void*)); taosHashPut(bm->pDbChkpTbl, task, strlen(task), &p, sizeof(void*));
code = 0; code = 0;
} }

View File

@ -68,12 +68,12 @@ static void streamMetaEnvInit() {
} }
} }
void streamMetaInit() { (void) taosThreadOnce(&streamMetaModuleInit, streamMetaEnvInit); } void streamMetaInit() { (void)taosThreadOnce(&streamMetaModuleInit, streamMetaEnvInit); }
void streamMetaCleanup() { void streamMetaCleanup() {
(void) taosCloseRef(streamBackendId); (void)taosCloseRef(streamBackendId);
(void) taosCloseRef(streamBackendCfWrapperId); (void)taosCloseRef(streamBackendCfWrapperId);
(void) taosCloseRef(streamMetaId); (void)taosCloseRef(streamMetaId);
metaRefMgtCleanup(); metaRefMgtCleanup();
streamTimerCleanUp(); streamTimerCleanUp();
@ -128,12 +128,12 @@ int32_t metaRefMgtAdd(int64_t vgId, int64_t* rid) {
code = taosHashPut(gMetaRefMgt.pTable, &vgId, sizeof(vgId), &list, sizeof(void*)); code = taosHashPut(gMetaRefMgt.pTable, &vgId, sizeof(vgId), &list, sizeof(void*));
if (code) { if (code) {
stError("vgId:%d failed to put into metaRef table, rid:%" PRId64, (int32_t) vgId, *rid); stError("vgId:%d failed to put into metaRef table, rid:%" PRId64, (int32_t)vgId, *rid);
return code; return code;
} }
} else { } else {
SArray* list = *(SArray**)p; SArray* list = *(SArray**)p;
void* px = taosArrayPush(list, &rid); void* px = taosArrayPush(list, &rid);
if (px == NULL) { if (px == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
} }
@ -186,7 +186,7 @@ int32_t streamMetaCheckBackendCompatible(SStreamMeta* pMeta) {
code = tdbTbcMoveToFirst(pCur); code = tdbTbcMoveToFirst(pCur);
if (code) { if (code) {
(void) tdbTbcClose(pCur); (void)tdbTbcClose(pCur);
stError("vgId:%d failed to open stream meta file cursor, not perform compatible check", pMeta->vgId); stError("vgId:%d failed to open stream meta file cursor, not perform compatible check", pMeta->vgId);
return ret; return ret;
} }
@ -215,7 +215,7 @@ int32_t streamMetaCheckBackendCompatible(SStreamMeta* pMeta) {
tdbFree(pKey); tdbFree(pKey);
tdbFree(pVal); tdbFree(pVal);
(void) tdbTbcClose(pCur); (void)tdbTbcClose(pCur);
return ret; return ret;
} }
@ -276,6 +276,7 @@ int32_t streamMetaMayCvtDbFormat(SStreamMeta* pMeta) {
} }
int32_t streamTaskSetDb(SStreamMeta* pMeta, SStreamTask* pTask, const char* key) { int32_t streamTaskSetDb(SStreamMeta* pMeta, SStreamTask* pTask, const char* key) {
int32_t code = 0;
int64_t chkpId = pTask->chkInfo.checkpointId; int64_t chkpId = pTask->chkInfo.checkpointId;
streamMutexLock(&pMeta->backendMutex); streamMutexLock(&pMeta->backendMutex);
@ -299,8 +300,8 @@ int32_t streamTaskSetDb(SStreamMeta* pMeta, SStreamTask* pTask, const char* key)
STaskDbWrapper* pBackend = NULL; STaskDbWrapper* pBackend = NULL;
int64_t processVer = -1; int64_t processVer = -1;
while (1) { while (1) {
pBackend = taskDbOpen(pMeta->path, key, chkpId, &processVer); code = taskDbOpen(pMeta->path, key, chkpId, &processVer, &pBackend);
if (pBackend != NULL) { if (code == 0) {
break; break;
} }
@ -319,7 +320,7 @@ int32_t streamTaskSetDb(SStreamMeta* pMeta, SStreamTask* pTask, const char* key)
if (processVer != -1) pTask->chkInfo.processedVer = processVer; if (processVer != -1) pTask->chkInfo.processedVer = processVer;
int32_t code = taosHashPut(pMeta->pTaskDbUnique, key, strlen(key), &pBackend, sizeof(void*)); code = taosHashPut(pMeta->pTaskDbUnique, key, strlen(key), &pBackend, sizeof(void*));
if (code) { if (code) {
stError("s-task:0x%x failed to put taskDb backend, code:out of memory", pTask->id.taskId); stError("s-task:0x%x failed to put taskDb backend, code:out of memory", pTask->id.taskId);
} }
@ -469,8 +470,8 @@ int32_t streamMetaOpen(const char* path, void* ahandle, FTaskBuild buildTaskFn,
pMeta->qHandle = taosInitScheduler(32, 1, "stream-chkp", NULL); pMeta->qHandle = taosInitScheduler(32, 1, "stream-chkp", NULL);
pMeta->bkdChkptMgt = bkdMgtCreate(tpath); code = bkdMgtCreate(tpath, (SBkdMgt**)&pMeta->bkdChkptMgt);
if (pMeta->bkdChkptMgt == NULL) { if (code != 0) {
goto _err; goto _err;
} }
@ -485,7 +486,7 @@ _err:
if (pMeta->pTaskList) taosArrayDestroy(pMeta->pTaskList); if (pMeta->pTaskList) taosArrayDestroy(pMeta->pTaskList);
if (pMeta->pTaskDb) (void)tdbTbClose(pMeta->pTaskDb); if (pMeta->pTaskDb) (void)tdbTbClose(pMeta->pTaskDb);
if (pMeta->pCheckpointDb) (void)tdbTbClose(pMeta->pCheckpointDb); if (pMeta->pCheckpointDb) (void)tdbTbClose(pMeta->pCheckpointDb);
if (pMeta->db) (void) tdbClose(pMeta->db); if (pMeta->db) (void)tdbClose(pMeta->db);
if (pMeta->pHbInfo) taosMemoryFreeClear(pMeta->pHbInfo); if (pMeta->pHbInfo) taosMemoryFreeClear(pMeta->pHbInfo);
if (pMeta->updateInfo.pTasks) taosHashCleanup(pMeta->updateInfo.pTasks); if (pMeta->updateInfo.pTasks) taosHashCleanup(pMeta->updateInfo.pTasks);
if (pMeta->startInfo.pReadyTaskSet) taosHashCleanup(pMeta->startInfo.pReadyTaskSet); if (pMeta->startInfo.pReadyTaskSet) taosHashCleanup(pMeta->startInfo.pReadyTaskSet);
@ -531,7 +532,7 @@ void streamMetaClear(SStreamMeta* pMeta) {
// release the ref by timer // release the ref by timer
if (p->info.delaySchedParam != 0 && p->info.fillHistory == 0) { // one more ref in timer if (p->info.delaySchedParam != 0 && p->info.fillHistory == 0) { // one more ref in timer
stDebug("s-task:%s stop schedTimer, and (before) desc ref:%d", p->id.idStr, p->refCnt); stDebug("s-task:%s stop schedTimer, and (before) desc ref:%d", p->id.idStr, p->refCnt);
(void) taosTmrStop(p->schedInfo.pDelayTimer); (void)taosTmrStop(p->schedInfo.pDelayTimer);
p->info.delaySchedParam = 0; p->info.delaySchedParam = 0;
streamMetaReleaseTask(pMeta, p); streamMetaReleaseTask(pMeta, p);
} }
@ -566,7 +567,7 @@ void streamMetaClose(SStreamMeta* pMeta) {
if (pMeta == NULL) { if (pMeta == NULL) {
return; return;
} }
(void) taosRemoveRef(streamMetaId, pMeta->rid); (void)taosRemoveRef(streamMetaId, pMeta->rid);
} }
void streamMetaCloseImpl(void* arg) { void streamMetaCloseImpl(void* arg) {
@ -583,10 +584,10 @@ void streamMetaCloseImpl(void* arg) {
streamMetaWUnLock(pMeta); streamMetaWUnLock(pMeta);
// already log the error, ignore here // already log the error, ignore here
(void) tdbAbort(pMeta->db, pMeta->txn); (void)tdbAbort(pMeta->db, pMeta->txn);
(void) tdbTbClose(pMeta->pTaskDb); (void)tdbTbClose(pMeta->pTaskDb);
(void) tdbTbClose(pMeta->pCheckpointDb); (void)tdbTbClose(pMeta->pCheckpointDb);
(void) tdbClose(pMeta->db); (void)tdbClose(pMeta->db);
taosArrayDestroy(pMeta->pTaskList); taosArrayDestroy(pMeta->pTaskList);
taosArrayDestroy(pMeta->chkpSaved); taosArrayDestroy(pMeta->chkpSaved);
@ -610,7 +611,7 @@ void streamMetaCloseImpl(void* arg) {
bkdMgtDestroy(pMeta->bkdChkptMgt); bkdMgtDestroy(pMeta->bkdChkptMgt);
pMeta->role = NODE_ROLE_UNINIT; pMeta->role = NODE_ROLE_UNINIT;
(void) taosThreadRwlockDestroy(&pMeta->lock); (void)taosThreadRwlockDestroy(&pMeta->lock);
taosMemoryFree(pMeta); taosMemoryFree(pMeta);
stDebug("vgId:%d end to close stream meta", vgId); stDebug("vgId:%d end to close stream meta", vgId);
@ -691,13 +692,13 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa
p = taosArrayPush(pMeta->pTaskList, &pTask->id); p = taosArrayPush(pMeta->pTaskList, &pTask->id);
if (p == NULL) { if (p == NULL) {
stError("s-task:0x%"PRIx64" failed to register task into meta-list, code: out of memory", id.taskId); stError("s-task:0x%" PRIx64 " failed to register task into meta-list, code: out of memory", id.taskId);
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
code = taosHashPut(pMeta->pTasksMap, &id, sizeof(id), &pTask, POINTER_BYTES); code = taosHashPut(pMeta->pTasksMap, &id, sizeof(id), &pTask, POINTER_BYTES);
if (code) { if (code) {
stError("s-task:0x%"PRIx64" failed to register task into meta-list, code: out of memory", id.taskId); stError("s-task:0x%" PRIx64 " failed to register task into meta-list, code: out of memory", id.taskId);
return code; return code;
} }
@ -710,7 +711,7 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa
} }
if (pTask->info.fillHistory == 0) { if (pTask->info.fillHistory == 0) {
(void) atomic_add_fetch_32(&pMeta->numOfStreamTasks, 1); (void)atomic_add_fetch_32(&pMeta->numOfStreamTasks, 1);
} }
*pAdded = true; *pAdded = true;
@ -779,7 +780,7 @@ static void doRemoveIdFromList(SArray* pTaskList, int32_t num, SStreamTaskId* id
static int32_t streamTaskSendTransSuccessMsg(SStreamTask* pTask, void* param) { static int32_t streamTaskSendTransSuccessMsg(SStreamTask* pTask, void* param) {
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
(void) streamTaskSendCheckpointSourceRsp(pTask); (void)streamTaskSendCheckpointSourceRsp(pTask);
} }
return 0; return 0;
} }
@ -802,7 +803,7 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t
} }
// handle the dropping event // handle the dropping event
(void) streamTaskHandleEventAsync(pTask->status.pSM, TASK_EVENT_DROPPING, streamTaskSendTransSuccessMsg, NULL); (void)streamTaskHandleEventAsync(pTask->status.pSM, TASK_EVENT_DROPPING, streamTaskSendTransSuccessMsg, NULL);
} else { } else {
stDebug("vgId:%d failed to find the task:0x%x, it may be dropped already", pMeta->vgId, taskId); stDebug("vgId:%d failed to find the task:0x%x, it may be dropped already", pMeta->vgId, taskId);
streamMetaWUnLock(pMeta); streamMetaWUnLock(pMeta);
@ -841,12 +842,12 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t
pTask = *ppTask; pTask = *ppTask;
// it is an fill-history task, remove the related stream task's id that points to it // it is an fill-history task, remove the related stream task's id that points to it
if (pTask->info.fillHistory == 0) { if (pTask->info.fillHistory == 0) {
(void) atomic_sub_fetch_32(&pMeta->numOfStreamTasks, 1); (void)atomic_sub_fetch_32(&pMeta->numOfStreamTasks, 1);
} }
(void) taosHashRemove(pMeta->pTasksMap, &id, sizeof(id)); (void)taosHashRemove(pMeta->pTasksMap, &id, sizeof(id));
doRemoveIdFromList(pMeta->pTaskList, (int32_t)taosArrayGetSize(pMeta->pTaskList), &pTask->id); doRemoveIdFromList(pMeta->pTaskList, (int32_t)taosArrayGetSize(pMeta->pTaskList), &pTask->id);
(void) streamMetaRemoveTask(pMeta, &id); (void)streamMetaRemoveTask(pMeta, &id);
ASSERT(taosHashGetSize(pMeta->pTasksMap) == taosArrayGetSize(pMeta->pTaskList)); ASSERT(taosHashGetSize(pMeta->pTasksMap) == taosArrayGetSize(pMeta->pTaskList));
streamMetaWUnLock(pMeta); streamMetaWUnLock(pMeta);
@ -854,7 +855,7 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t
ASSERT(pTask->status.timerActive == 0); ASSERT(pTask->status.timerActive == 0);
if (pTask->info.delaySchedParam != 0 && pTask->info.fillHistory == 0) { if (pTask->info.delaySchedParam != 0 && pTask->info.fillHistory == 0) {
stDebug("s-task:%s stop schedTimer, and (before) desc ref:%d", pTask->id.idStr, pTask->refCnt); stDebug("s-task:%s stop schedTimer, and (before) desc ref:%d", pTask->id.idStr, pTask->refCnt);
(void) taosTmrStop(pTask->schedInfo.pDelayTimer); (void)taosTmrStop(pTask->schedInfo.pDelayTimer);
pTask->info.delaySchedParam = 0; pTask->info.delaySchedParam = 0;
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
} }
@ -915,7 +916,7 @@ int64_t streamMetaGetLatestCheckpointId(SStreamMeta* pMeta) {
code = tdbTbcMoveToFirst(pCur); code = tdbTbcMoveToFirst(pCur);
if (code) { if (code) {
(void) tdbTbcClose(pCur); (void)tdbTbcClose(pCur);
stError("failed to open stream meta file cursor, the latest checkpointId is 0, vgId:%d", pMeta->vgId); stError("failed to open stream meta file cursor, the latest checkpointId is 0, vgId:%d", pMeta->vgId);
return checkpointId; return checkpointId;
} }
@ -953,7 +954,7 @@ void streamMetaLoadAllTasks(SStreamMeta* pMeta) {
SDecoder decoder; SDecoder decoder;
int32_t vgId = 0; int32_t vgId = 0;
int32_t code = 0; int32_t code = 0;
SArray* pRecycleList = NULL; SArray* pRecycleList = NULL;
if (pMeta == NULL) { if (pMeta == NULL) {
return; return;
@ -975,7 +976,7 @@ void streamMetaLoadAllTasks(SStreamMeta* pMeta) {
if (code) { if (code) {
stError("vgId:%d failed to open stream meta cursor, code:%s, not load any stream tasks", vgId, tstrerror(terrno)); stError("vgId:%d failed to open stream meta cursor, code:%s, not load any stream tasks", vgId, tstrerror(terrno));
taosArrayDestroy(pRecycleList); taosArrayDestroy(pRecycleList);
(void) tdbTbcClose(pCur); (void)tdbTbcClose(pCur);
return; return;
} }
@ -1008,7 +1009,7 @@ void streamMetaLoadAllTasks(SStreamMeta* pMeta) {
tFreeStreamTask(pTask); tFreeStreamTask(pTask);
STaskId id = streamTaskGetTaskId(pTask); STaskId id = streamTaskGetTaskId(pTask);
(void) taosArrayPush(pRecycleList, &id); (void)taosArrayPush(pRecycleList, &id);
int32_t total = taosArrayGetSize(pRecycleList); int32_t total = taosArrayGetSize(pRecycleList);
stDebug("s-task:0x%x is already dropped, add into recycle list, total:%d", taskId, total); stDebug("s-task:0x%x is already dropped, add into recycle list, total:%d", taskId, total);
@ -1029,7 +1030,7 @@ void streamMetaLoadAllTasks(SStreamMeta* pMeta) {
continue; continue;
} }
(void) taosArrayPush(pMeta->pTaskList, &pTask->id); (void)taosArrayPush(pMeta->pTaskList, &pTask->id);
} else { } else {
// todo this should replace the existed object put by replay creating stream task msg from mnode // todo this should replace the existed object put by replay creating stream task msg from mnode
stError("s-task:0x%x already added into table meta by replaying WAL, need check", pTask->id.taskId); stError("s-task:0x%x already added into table meta by replaying WAL, need check", pTask->id.taskId);
@ -1039,17 +1040,17 @@ void streamMetaLoadAllTasks(SStreamMeta* pMeta) {
if (taosHashPut(pMeta->pTasksMap, &id, sizeof(id), &pTask, POINTER_BYTES) != 0) { if (taosHashPut(pMeta->pTasksMap, &id, sizeof(id), &pTask, POINTER_BYTES) != 0) {
stError("s-task:0x%x failed to put into hashTable, code:%s, continue", pTask->id.taskId, tstrerror(terrno)); stError("s-task:0x%x failed to put into hashTable, code:%s, continue", pTask->id.taskId, tstrerror(terrno));
(void) taosArrayPop(pMeta->pTaskList); (void)taosArrayPop(pMeta->pTaskList);
tFreeStreamTask(pTask); tFreeStreamTask(pTask);
continue; continue;
} }
if (pTask->info.fillHistory == 0) { if (pTask->info.fillHistory == 0) {
(void) atomic_add_fetch_32(&pMeta->numOfStreamTasks, 1); (void)atomic_add_fetch_32(&pMeta->numOfStreamTasks, 1);
} }
if (streamTaskShouldPause(pTask)) { if (streamTaskShouldPause(pTask)) {
(void) atomic_add_fetch_32(&pMeta->numOfPausedTasks, 1); (void)atomic_add_fetch_32(&pMeta->numOfPausedTasks, 1);
} }
ASSERT(pTask->status.downstreamReady == 0); ASSERT(pTask->status.downstreamReady == 0);
@ -1065,7 +1066,7 @@ void streamMetaLoadAllTasks(SStreamMeta* pMeta) {
if (taosArrayGetSize(pRecycleList) > 0) { if (taosArrayGetSize(pRecycleList) > 0) {
for (int32_t i = 0; i < taosArrayGetSize(pRecycleList); ++i) { for (int32_t i = 0; i < taosArrayGetSize(pRecycleList); ++i) {
STaskId* pId = taosArrayGet(pRecycleList, i); STaskId* pId = taosArrayGet(pRecycleList, i);
(void) streamMetaRemoveTask(pMeta, pId); (void)streamMetaRemoveTask(pMeta, pId);
} }
} }
@ -1093,7 +1094,7 @@ bool streamMetaTaskInTimer(SStreamMeta* pMeta) {
SStreamTask* pTask = *(SStreamTask**)pIter; SStreamTask* pTask = *(SStreamTask**)pIter;
if (pTask->status.timerActive >= 1) { if (pTask->status.timerActive >= 1) {
stDebug("s-task:%s in timer, blocking tasks in vgId:%d restart, set closing again", pTask->id.idStr, pMeta->vgId); stDebug("s-task:%s in timer, blocking tasks in vgId:%d restart, set closing again", pTask->id.idStr, pMeta->vgId);
(void) streamTaskStop(pTask); (void)streamTaskStop(pTask);
inTimer = true; inTimer = true;
} }
} }
@ -1126,7 +1127,7 @@ void streamMetaNotifyClose(SStreamMeta* pMeta) {
SStreamTask* pTask = *(SStreamTask**)pIter; SStreamTask* pTask = *(SStreamTask**)pIter;
stDebug("vgId:%d s-task:%s set task closing flag", vgId, pTask->id.idStr); stDebug("vgId:%d s-task:%s set task closing flag", vgId, pTask->id.idStr);
(void) streamTaskStop(pTask); (void)streamTaskStop(pTask);
} }
streamMetaWUnLock(pMeta); streamMetaWUnLock(pMeta);
@ -1173,7 +1174,7 @@ void streamMetaResetStartInfo(STaskStartInfo* pStartInfo, int32_t vgId) {
void streamMetaRLock(SStreamMeta* pMeta) { void streamMetaRLock(SStreamMeta* pMeta) {
// stTrace("vgId:%d meta-rlock", pMeta->vgId); // stTrace("vgId:%d meta-rlock", pMeta->vgId);
(void) taosThreadRwlockRdlock(&pMeta->lock); (void)taosThreadRwlockRdlock(&pMeta->lock);
} }
void streamMetaRUnLock(SStreamMeta* pMeta) { void streamMetaRUnLock(SStreamMeta* pMeta) {
@ -1188,13 +1189,13 @@ void streamMetaRUnLock(SStreamMeta* pMeta) {
void streamMetaWLock(SStreamMeta* pMeta) { void streamMetaWLock(SStreamMeta* pMeta) {
// stTrace("vgId:%d meta-wlock", pMeta->vgId); // stTrace("vgId:%d meta-wlock", pMeta->vgId);
(void) taosThreadRwlockWrlock(&pMeta->lock); (void)taosThreadRwlockWrlock(&pMeta->lock);
// stTrace("vgId:%d meta-wlock completed", pMeta->vgId); // stTrace("vgId:%d meta-wlock completed", pMeta->vgId);
} }
void streamMetaWUnLock(SStreamMeta* pMeta) { void streamMetaWUnLock(SStreamMeta* pMeta) {
// stTrace("vgId:%d meta-wunlock", pMeta->vgId); // stTrace("vgId:%d meta-wunlock", pMeta->vgId);
(void) taosThreadRwlockUnlock(&pMeta->lock); (void)taosThreadRwlockUnlock(&pMeta->lock);
} }
int32_t streamMetaSendMsgBeforeCloseTasks(SStreamMeta* pMeta, SArray** pList) { int32_t streamMetaSendMsgBeforeCloseTasks(SStreamMeta* pMeta, SArray** pList) {
@ -1320,7 +1321,7 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) {
code = streamMetaAcquireTask(pMeta, pTaskId->streamId, pTaskId->taskId, &pTask); code = streamMetaAcquireTask(pMeta, pTaskId->streamId, pTaskId->taskId, &pTask);
if (pTask == NULL) { if (pTask == NULL) {
stError("vgId:%d failed to acquire task:0x%x during start tasks", pMeta->vgId, pTaskId->taskId); stError("vgId:%d failed to acquire task:0x%x during start tasks", pMeta->vgId, pTaskId->taskId);
(void) streamMetaAddFailedTask(pMeta, pTaskId->streamId, pTaskId->taskId); (void)streamMetaAddFailedTask(pMeta, pTaskId->streamId, pTaskId->taskId);
continue; continue;
} }
@ -1343,7 +1344,7 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) {
code = streamMetaAcquireTask(pMeta, pTaskId->streamId, pTaskId->taskId, &pTask); code = streamMetaAcquireTask(pMeta, pTaskId->streamId, pTaskId->taskId, &pTask);
if (pTask == NULL) { if (pTask == NULL) {
stError("vgId:%d failed to acquire task:0x%x during start tasks", pMeta->vgId, pTaskId->taskId); stError("vgId:%d failed to acquire task:0x%x during start tasks", pMeta->vgId, pTaskId->taskId);
(void) streamMetaAddFailedTask(pMeta, pTaskId->streamId, pTaskId->taskId); (void)streamMetaAddFailedTask(pMeta, pTaskId->streamId, pTaskId->taskId);
continue; continue;
} }
@ -1361,10 +1362,11 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) {
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
stDebug("s-task:%s downstream ready, no need to check downstream, check only related fill-history task", stDebug("s-task:%s downstream ready, no need to check downstream, check only related fill-history task",
pTask->id.idStr); pTask->id.idStr);
(void) streamLaunchFillHistoryTask(pTask); // todo: how about retry launch fill-history task? (void)streamLaunchFillHistoryTask(pTask); // todo: how about retry launch fill-history task?
} }
(void) streamMetaAddTaskLaunchResult(pMeta, pTaskId->streamId, pTaskId->taskId, pInfo->checkTs, pInfo->readyTs, true); (void)streamMetaAddTaskLaunchResult(pMeta, pTaskId->streamId, pTaskId->taskId, pInfo->checkTs, pInfo->readyTs,
true);
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
continue; continue;
} }
@ -1420,14 +1422,14 @@ int32_t streamMetaStopAllTasks(SStreamMeta* pMeta) {
for (int32_t i = 0; i < numOfTasks; ++i) { for (int32_t i = 0; i < numOfTasks; ++i) {
SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i); SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i);
SStreamTask* pTask = NULL; SStreamTask* pTask = NULL;
code = streamMetaAcquireTaskNoLock(pMeta, pTaskId->streamId, pTaskId->taskId, &pTask); code = streamMetaAcquireTaskNoLock(pMeta, pTaskId->streamId, pTaskId->taskId, &pTask);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
continue; continue;
} }
(void) streamTaskStop(pTask); (void)streamTaskStop(pTask);
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
} }
@ -1467,7 +1469,7 @@ int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t tas
code = streamMetaAcquireTask(pMeta, streamId, taskId, &pTask); code = streamMetaAcquireTask(pMeta, streamId, taskId, &pTask);
if (pTask == NULL) { if (pTask == NULL) {
stError("vgId:%d failed to acquire task:0x%x when starting task", pMeta->vgId, taskId); stError("vgId:%d failed to acquire task:0x%x when starting task", pMeta->vgId, taskId);
(void) streamMetaAddFailedTask(pMeta, streamId, taskId); (void)streamMetaAddFailedTask(pMeta, streamId, taskId);
return TSDB_CODE_STREAM_TASK_IVLD_STATUS; return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
} }
@ -1558,9 +1560,8 @@ int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int3
SHashObj* pDst = ready ? pStartInfo->pReadyTaskSet : pStartInfo->pFailedTaskSet; SHashObj* pDst = ready ? pStartInfo->pReadyTaskSet : pStartInfo->pFailedTaskSet;
STaskInitTs initTs = {.start = startTs, .end = endTs, .success = ready}; STaskInitTs initTs = {.start = startTs, .end = endTs, .success = ready};
int32_t code = taosHashPut(pDst, &id, sizeof(id), &initTs, sizeof(STaskInitTs)); int32_t code = taosHashPut(pDst, &id, sizeof(id), &initTs, sizeof(STaskInitTs));
if (code) { if (code) {
} }
int32_t numOfTotal = streamMetaGetNumOfTasks(pMeta); int32_t numOfTotal = streamMetaGetNumOfTasks(pMeta);
@ -1632,9 +1633,9 @@ int32_t streamMetaAddFailedTask(SStreamMeta* pMeta, int64_t streamId, int32_t ta
streamMetaRUnLock(pMeta); streamMetaRUnLock(pMeta);
// add the failed task info, along with the related fill-history task info into tasks list. // add the failed task info, along with the related fill-history task info into tasks list.
(void) streamMetaAddTaskLaunchResult(pMeta, streamId, taskId, startTs, now, false); (void)streamMetaAddTaskLaunchResult(pMeta, streamId, taskId, startTs, now, false);
if (hasFillhistoryTask) { if (hasFillhistoryTask) {
(void) streamMetaAddTaskLaunchResult(pMeta, hId.streamId, hId.taskId, startTs, now, false); (void)streamMetaAddTaskLaunchResult(pMeta, hId.streamId, hId.taskId, startTs, now, false);
} }
} else { } else {
streamMetaRUnLock(pMeta); streamMetaRUnLock(pMeta);
@ -1649,12 +1650,12 @@ int32_t streamMetaAddFailedTask(SStreamMeta* pMeta, int64_t streamId, int32_t ta
void streamMetaAddFailedTaskSelf(SStreamTask* pTask, int64_t failedTs) { void streamMetaAddFailedTaskSelf(SStreamTask* pTask, int64_t failedTs) {
int32_t startTs = pTask->execInfo.checkTs; int32_t startTs = pTask->execInfo.checkTs;
(void) streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, startTs, failedTs, false); (void)streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, startTs, failedTs, false);
// automatically set the related fill-history task to be failed. // automatically set the related fill-history task to be failed.
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
STaskId* pId = &pTask->hTaskInfo.id; STaskId* pId = &pTask->hTaskInfo.id;
(void) streamMetaAddTaskLaunchResult(pTask->pMeta, pId->streamId, pId->taskId, startTs, failedTs, false); (void)streamMetaAddTaskLaunchResult(pTask->pMeta, pId->streamId, pId->taskId, startTs, failedTs, false);
} }
} }
@ -1662,7 +1663,7 @@ void streamMetaAddIntoUpdateTaskList(SStreamMeta* pMeta, SStreamTask* pTask, SSt
int64_t startTs) { int64_t startTs) {
const char* id = pTask->id.idStr; const char* id = pTask->id.idStr;
int32_t vgId = pTask->pMeta->vgId; int32_t vgId = pTask->pMeta->vgId;
int32_t code = 0; int32_t code = 0;
// keep the already updated info // keep the already updated info
STaskUpdateEntry entry = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId, .transId = transId}; STaskUpdateEntry entry = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId, .transId = transId};

View File

@ -69,7 +69,7 @@ void *backendOpen() {
key.ts = ts; key.ts = ts;
const char *val = "value data"; const char *val = "value data";
int32_t vlen = strlen(val); int32_t vlen = strlen(val);
int32_t code = streamStatePut_rocksdb(p, &key, (char *)val, vlen); int32_t code = streamStatePut_rocksdb(p, &key, (char *)val, vlen);
ASSERT(code == 0); ASSERT(code == 0);
tsArray.push_back(ts); tsArray.push_back(ts);
@ -83,7 +83,7 @@ void *backendOpen() {
const char *val = "value data"; const char *val = "value data";
int32_t len = 0; int32_t len = 0;
char *newVal = NULL; char *newVal = NULL;
int32_t code = streamStateGet_rocksdb(p, &key, (void **)&newVal, &len); int32_t code = streamStateGet_rocksdb(p, &key, (void **)&newVal, &len);
ASSERT(code == 0); ASSERT(code == 0);
ASSERT(len == strlen(val)); ASSERT(len == strlen(val));
@ -377,7 +377,7 @@ TEST_F(BackendEnv, checkOpen) {
char val[128] = {0}; char val[128] = {0};
sprintf(val, "val_%d", i); sprintf(val, "val_%d", i);
int32_t code = streamStatePutBatch(p, "default", (rocksdb_writebatch_t *)pBatch, (void *)key, (void *)val, int32_t code = streamStatePutBatch(p, "default", (rocksdb_writebatch_t *)pBatch, (void *)key, (void *)val,
(int32_t)(strlen(val)), tsStart + 100000); (int32_t)(strlen(val)), tsStart + 100000);
ASSERT(code == 0); ASSERT(code == 0);
} }
@ -396,7 +396,7 @@ TEST_F(BackendEnv, checkOpen) {
char val[128] = {0}; char val[128] = {0};
sprintf(val, "val_%d", i); sprintf(val, "val_%d", i);
int32_t code = streamStatePutBatchOptimize(p, 0, (rocksdb_writebatch_t *)pBatch, (void *)key, (void *)val, int32_t code = streamStatePutBatchOptimize(p, 0, (rocksdb_writebatch_t *)pBatch, (void *)key, (void *)val,
(int32_t)(strlen(val)), tsStart + 100000, (void *)valBuf); (int32_t)(strlen(val)), tsStart + 100000, (void *)valBuf);
ASSERT(code == 0); ASSERT(code == 0);
} }
int32_t code = streamStatePutBatch_rocksdb(p, pBatch); int32_t code = streamStatePutBatch_rocksdb(p, pBatch);
@ -417,7 +417,7 @@ TEST_F(BackendEnv, checkOpen) {
char val[128] = {0}; char val[128] = {0};
sprintf(val, "val_%d", i); sprintf(val, "val_%d", i);
int32_t code = streamStatePutBatchOptimize(p, 0, (rocksdb_writebatch_t *)pBatch, (void *)key, (void *)val, int32_t code = streamStatePutBatchOptimize(p, 0, (rocksdb_writebatch_t *)pBatch, (void *)key, (void *)val,
(int32_t)(strlen(val)), tsStart + 100000, (void *)valBuf); (int32_t)(strlen(val)), tsStart + 100000, (void *)valBuf);
ASSERT(code == 0); ASSERT(code == 0);
} }
code = streamStatePutBatch_rocksdb(p, pBatch); code = streamStatePutBatch_rocksdb(p, pBatch);
@ -432,13 +432,12 @@ TEST_F(BackendEnv, checkOpen) {
const char *path = "/tmp/backend/stream"; const char *path = "/tmp/backend/stream";
const char *dump = "/tmp/backend/stream/dump"; const char *dump = "/tmp/backend/stream/dump";
// taosMkDir(dump); // taosMkDir(dump);
code = taosMulMkDir(dump); taosMulMkDir(dump);
ASSERT(code == 0); SBkdMgt *mgt = NULL;
SBkdMgt *mgt = bkdMgtCreate((char *)path); code = bkdMgtCreate((char *)path, &mgt);
SArray *result = taosArrayInit(4, sizeof(void *)); SArray *result = taosArrayInit(4, sizeof(void *));
code = bkdMgtGetDelta(mgt, p->pTdbState->idstr, 3, result, (char *)dump); bkdMgtGetDelta(mgt, p->pTdbState->idstr, 3, result, (char *)dump);
ASSERT(code == 0);
code = taskDbDoCheckpoint(p->pTdbState->pOwner->pBackend, 4, 0); code = taskDbDoCheckpoint(p->pTdbState->pOwner->pBackend, 4, 0);
ASSERT(code == 0); ASSERT(code == 0);
@ -475,7 +474,7 @@ TEST_F(BackendEnv, backendUtil) {
} }
TEST_F(BackendEnv, oldBackendInit) { TEST_F(BackendEnv, oldBackendInit) {
const char *path = "/tmp/backend1"; const char *path = "/tmp/backend1";
int32_t code = taosMulMkDir(path); int32_t code = taosMulMkDir(path);
ASSERT(code == 0); ASSERT(code == 0);
{ {

View File

@ -53,7 +53,7 @@ TEST(TD_STREAM_UPDATE_TEST, update) {
void *p = NULL; void *p = NULL;
// SBackendWrapper *p = streamBackendInit(streamPath, -1, 2); // SBackendWrapper *p = streamBackendInit(streamPath, -1, 2);
// p = taskDbOpen((char *)streamPath, (char *)"test", -1); // p = taskDbOpen((char *)streamPath, (char *)"test", -1);
p = bkdMgtCreate((char *)streamPath); int32_t code = bkdMgtCreate((char *)streamPath, (SBkdMgt **)&p);
// const int64_t interval = 20 * 1000; // const int64_t interval = 20 * 1000;
// const int64_t watermark = 10 * 60 * 1000; // const int64_t watermark = 10 * 60 * 1000;