refactor(stream): use vnode meta
This commit is contained in:
parent
6d67b17113
commit
bb6fabcc11
|
@ -56,7 +56,6 @@ enum {
|
|||
STREAM_INPUT__DATA_SUBMIT = 1,
|
||||
STREAM_INPUT__DATA_BLOCK,
|
||||
STREAM_INPUT__MERGED_SUBMIT,
|
||||
// STREAM_INPUT__TABLE_SCAN,
|
||||
STREAM_INPUT__TQ_SCAN,
|
||||
STREAM_INPUT__DATA_RETRIEVE,
|
||||
STREAM_INPUT__GET_RES,
|
||||
|
|
|
@ -41,7 +41,7 @@ extern "C" {
|
|||
#define WAL_REFRESH_MS 1000
|
||||
#define WAL_PATH_LEN (TSDB_FILENAME_LEN + 12)
|
||||
#define WAL_FILE_LEN (WAL_PATH_LEN + 32)
|
||||
#define WAL_MAGIC 0xFAFBFCFDULL
|
||||
#define WAL_MAGIC 0xFAFBFCFDF4F3F2F1ULL
|
||||
#define WAL_SCAN_BUF_SIZE (1024 * 1024 * 3)
|
||||
|
||||
typedef enum {
|
||||
|
|
|
@ -79,6 +79,9 @@ struct SMeta {
|
|||
TTB* pTtlIdx;
|
||||
|
||||
TTB* pSmaIdx;
|
||||
|
||||
TTB* pTaskIdx;
|
||||
|
||||
SMetaIdx* pIdx;
|
||||
};
|
||||
|
||||
|
|
|
@ -152,7 +152,7 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen);
|
|||
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg);
|
||||
int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen);
|
||||
int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen);
|
||||
int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* data);
|
||||
int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* data, int64_t ver);
|
||||
int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg);
|
||||
int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg);
|
||||
int32_t tqProcessTaskRecoverReq(STQ* pTq, SRpcMsg* pMsg);
|
||||
|
|
|
@ -22,6 +22,7 @@ static int tagIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kL
|
|||
static int ttlIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2);
|
||||
static int uidIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2);
|
||||
static int smaIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2);
|
||||
static int taskIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2);
|
||||
|
||||
static int32_t metaInitLock(SMeta *pMeta) { return taosThreadRwlockInit(&pMeta->lock, NULL); }
|
||||
static int32_t metaDestroyLock(SMeta *pMeta) { return taosThreadRwlockDestroy(&pMeta->lock); }
|
||||
|
@ -130,6 +131,12 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta) {
|
|||
goto _err;
|
||||
}
|
||||
|
||||
ret = tdbTbOpen("stream.task.db", sizeof(int64_t), -1, taskIdxKeyCmpr, pMeta->pEnv, &pMeta->pTaskIdx);
|
||||
if (ret < 0) {
|
||||
metaError("vgId: %d, failed to open meta stream task index since %s", TD_VID(pVnode), tstrerror(terrno));
|
||||
goto _err;
|
||||
}
|
||||
|
||||
// open index
|
||||
if (metaOpenIdx(pMeta) < 0) {
|
||||
metaError("vgId:%d, failed to open meta index since %s", TD_VID(pVnode), tstrerror(terrno));
|
||||
|
@ -143,6 +150,7 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta) {
|
|||
|
||||
_err:
|
||||
if (pMeta->pIdx) metaCloseIdx(pMeta);
|
||||
if (pMeta->pTaskIdx) tdbTbClose(pMeta->pTaskIdx);
|
||||
if (pMeta->pSmaIdx) tdbTbClose(pMeta->pSmaIdx);
|
||||
if (pMeta->pTtlIdx) tdbTbClose(pMeta->pTtlIdx);
|
||||
if (pMeta->pTagIvtIdx) indexClose(pMeta->pTagIvtIdx);
|
||||
|
@ -162,6 +170,7 @@ _err:
|
|||
int metaClose(SMeta *pMeta) {
|
||||
if (pMeta) {
|
||||
if (pMeta->pIdx) metaCloseIdx(pMeta);
|
||||
if (pMeta->pTaskIdx) tdbTbClose(pMeta->pTaskIdx);
|
||||
if (pMeta->pSmaIdx) tdbTbClose(pMeta->pSmaIdx);
|
||||
if (pMeta->pTtlIdx) tdbTbClose(pMeta->pTtlIdx);
|
||||
if (pMeta->pTagIvtIdx) indexClose(pMeta->pTagIvtIdx);
|
||||
|
@ -377,3 +386,16 @@ static int smaIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kL
|
|||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int taskIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2) {
|
||||
int32_t uid1 = *(int32_t *)pKey1;
|
||||
int32_t uid2 = *(int32_t *)pKey2;
|
||||
|
||||
if (uid1 > uid2) {
|
||||
return 1;
|
||||
} else if (uid1 < uid2) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -695,7 +695,7 @@ FAIL:
|
|||
return -1;
|
||||
}
|
||||
|
||||
int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* pReq) {
|
||||
int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* pReq, int64_t ver) {
|
||||
void* pIter = NULL;
|
||||
bool failed = false;
|
||||
SStreamDataSubmit* pSubmit = NULL;
|
||||
|
@ -713,7 +713,7 @@ int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* pReq) {
|
|||
SStreamTask* pTask = *(SStreamTask**)pIter;
|
||||
if (!pTask->isDataScan) continue;
|
||||
|
||||
qDebug("data submit enqueue stream task: %d", pTask->taskId);
|
||||
qDebug("data submit enqueue stream task: %d, ver: %ld", pTask->taskId, ver);
|
||||
|
||||
if (!failed) {
|
||||
if (streamTaskInput(pTask, (SStreamQueueItem*)pSubmit) < 0) {
|
||||
|
|
|
@ -48,7 +48,7 @@ int32_t tqMetaOpen(STQ* pTq) {
|
|||
ASSERT(0);
|
||||
}
|
||||
|
||||
if (tdbTbOpen("handles", -1, -1, 0, pTq->pMetaStore, &pTq->pExecStore) < 0) {
|
||||
if (tdbTbOpen("handles", -1, -1, NULL, pTq->pMetaStore, &pTq->pExecStore) < 0) {
|
||||
ASSERT(0);
|
||||
}
|
||||
|
||||
|
|
|
@ -252,7 +252,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
|
|||
SSubmitReq* pReq = (SSubmitReq*)data;
|
||||
pReq->version = ver;
|
||||
|
||||
tqProcessStreamTrigger(pTq, data);
|
||||
tqProcessStreamTrigger(pTq, data, ver);
|
||||
}
|
||||
|
||||
return 0;
|
||||
|
|
|
@ -116,15 +116,15 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
|
|||
}
|
||||
|
||||
walBuildIdxName(pWal, walGetCurFileFirstVer(pWal), fnameStr);
|
||||
TdFilePtr pIdxTFile = taosOpenFile(fnameStr, TD_FILE_WRITE | TD_FILE_READ | TD_FILE_APPEND);
|
||||
TdFilePtr pIdxFile = taosOpenFile(fnameStr, TD_FILE_WRITE | TD_FILE_READ | TD_FILE_APPEND);
|
||||
|
||||
if (pIdxTFile == NULL) {
|
||||
if (pIdxFile == NULL) {
|
||||
ASSERT(0);
|
||||
taosThreadMutexUnlock(&pWal->mutex);
|
||||
return -1;
|
||||
}
|
||||
int64_t idxOff = walGetVerIdxOffset(pWal, ver);
|
||||
code = taosLSeekFile(pIdxTFile, idxOff, SEEK_SET);
|
||||
code = taosLSeekFile(pIdxFile, idxOff, SEEK_SET);
|
||||
if (code < 0) {
|
||||
ASSERT(0);
|
||||
taosThreadMutexUnlock(&pWal->mutex);
|
||||
|
@ -132,7 +132,7 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
|
|||
}
|
||||
// read idx file and get log file pos
|
||||
SWalIdxEntry entry;
|
||||
if (taosReadFile(pIdxTFile, &entry, sizeof(SWalIdxEntry)) != sizeof(SWalIdxEntry)) {
|
||||
if (taosReadFile(pIdxFile, &entry, sizeof(SWalIdxEntry)) != sizeof(SWalIdxEntry)) {
|
||||
ASSERT(0);
|
||||
taosThreadMutexUnlock(&pWal->mutex);
|
||||
return -1;
|
||||
|
@ -140,24 +140,24 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
|
|||
ASSERT(entry.ver == ver);
|
||||
|
||||
walBuildLogName(pWal, walGetCurFileFirstVer(pWal), fnameStr);
|
||||
TdFilePtr pLogTFile = taosOpenFile(fnameStr, TD_FILE_WRITE | TD_FILE_READ | TD_FILE_APPEND);
|
||||
if (pLogTFile == NULL) {
|
||||
ASSERT(0);
|
||||
TdFilePtr pLogFile = taosOpenFile(fnameStr, TD_FILE_WRITE | TD_FILE_READ | TD_FILE_APPEND);
|
||||
if (pLogFile == NULL) {
|
||||
// TODO
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
taosThreadMutexUnlock(&pWal->mutex);
|
||||
return -1;
|
||||
}
|
||||
code = taosLSeekFile(pLogTFile, entry.offset, SEEK_SET);
|
||||
code = taosLSeekFile(pLogFile, entry.offset, SEEK_SET);
|
||||
if (code < 0) {
|
||||
ASSERT(0);
|
||||
// TODO
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
taosThreadMutexUnlock(&pWal->mutex);
|
||||
return -1;
|
||||
}
|
||||
// validate offset
|
||||
SWalCkHead head;
|
||||
ASSERT(taosValidFile(pLogTFile));
|
||||
int64_t size = taosReadFile(pLogTFile, &head, sizeof(SWalCkHead));
|
||||
ASSERT(taosValidFile(pLogFile));
|
||||
int64_t size = taosReadFile(pLogFile, &head, sizeof(SWalCkHead));
|
||||
if (size != sizeof(SWalCkHead)) {
|
||||
ASSERT(0);
|
||||
taosThreadMutexUnlock(&pWal->mutex);
|
||||
|
@ -180,14 +180,14 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
|
|||
}
|
||||
|
||||
// truncate old files
|
||||
code = taosFtruncateFile(pLogTFile, entry.offset);
|
||||
code = taosFtruncateFile(pLogFile, entry.offset);
|
||||
if (code < 0) {
|
||||
ASSERT(0);
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
taosThreadMutexUnlock(&pWal->mutex);
|
||||
return -1;
|
||||
}
|
||||
code = taosFtruncateFile(pIdxTFile, idxOff);
|
||||
code = taosFtruncateFile(pIdxFile, idxOff);
|
||||
if (code < 0) {
|
||||
ASSERT(0);
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
|
@ -205,8 +205,10 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
|
|||
ASSERT(((SWalFileInfo *)taosArrayGetLast(pWal->fileInfoSet))->fileSize == 0);
|
||||
((SWalFileInfo *)taosArrayGetLast(pWal->fileInfoSet))->firstVer = -1;
|
||||
}
|
||||
taosCloseFile(&pIdxTFile);
|
||||
taosCloseFile(&pLogTFile);
|
||||
taosCloseFile(&pIdxFile);
|
||||
taosCloseFile(&pLogFile);
|
||||
|
||||
walSaveMeta(pWal);
|
||||
|
||||
// unlock
|
||||
taosThreadMutexUnlock(&pWal->mutex);
|
||||
|
|
Loading…
Reference in New Issue