refactor code
This commit is contained in:
parent
3afea998ac
commit
090515c684
|
@ -219,7 +219,7 @@ void tqNotifyClose(STQ* pTq) {
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
taosWUnLockLatch(&pMeta->lock);
|
||||||
|
|
||||||
pMeta->killed = STREAM_META_WILL_STOP;
|
pMeta->killed = STREAM_META_WILL_STOP;
|
||||||
while(pMeta->killed != STREAM_META_OK_TO_STOP) {
|
while (pMeta->killed != STREAM_META_OK_TO_STOP) {
|
||||||
taosMsleep(100);
|
taosMsleep(100);
|
||||||
tqDebug("vgId:%d wait for meta to stop timer", pMeta->vgId);
|
tqDebug("vgId:%d wait for meta to stop timer", pMeta->vgId);
|
||||||
}
|
}
|
||||||
|
@ -1799,8 +1799,8 @@ int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
|
|
||||||
// set the initial value for generating check point
|
// set the initial value for generating check point
|
||||||
// set the mgmt epset info according to the checkout source msg from mnode, todo opt perf
|
// set the mgmt epset info according to the checkout source msg from mnode, todo opt perf
|
||||||
// pMeta->mgmtInfo.epset = req.mgmtEps;
|
// pMeta->mgmtInfo.epset = req.mgmtEps;
|
||||||
// pMeta->mgmtInfo.mnodeId = req.mnodeId;
|
// pMeta->mgmtInfo.mnodeId = req.mnodeId;
|
||||||
|
|
||||||
if (pMeta->chkptNotReadyTasks == 0) {
|
if (pMeta->chkptNotReadyTasks == 0) {
|
||||||
pMeta->chkptNotReadyTasks = taosArrayGetSize(pMeta->pTaskList);
|
pMeta->chkptNotReadyTasks = taosArrayGetSize(pMeta->pTaskList);
|
||||||
|
@ -1908,9 +1908,9 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
// all tasks are closed, now let's restart the stream meta
|
// all tasks are closed, now let's restart the stream meta
|
||||||
if (pMeta->closedTask == numOfCount) {
|
if (pMeta->closedTask == numOfCount) {
|
||||||
tqDebug("vgId:%d all tasks are updated, commit the update nodeInfo", vgId);
|
tqDebug("vgId:%d all tasks are updated, commit the update nodeInfo", vgId);
|
||||||
// if (streamMetaCommit(pMeta) < 0) {
|
// if (streamMetaCommit(pMeta) < 0) {
|
||||||
// persist to disk
|
// persist to disk
|
||||||
// }
|
// }
|
||||||
restartTasks = true;
|
restartTasks = true;
|
||||||
pMeta->closedTask = 0; // reset value
|
pMeta->closedTask = 0; // reset value
|
||||||
} else {
|
} else {
|
||||||
|
@ -1921,7 +1921,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
|
|
||||||
_end:
|
_end:
|
||||||
tDecoderClear(&decoder);
|
tDecoderClear(&decoder);
|
||||||
// tmsgSendRsp(&rsp);
|
// tmsgSendRsp(&rsp);
|
||||||
|
|
||||||
if (restartTasks) {
|
if (restartTasks) {
|
||||||
tqDebug("vgId:%d all tasks are stopped, restart them", vgId);
|
tqDebug("vgId:%d all tasks are stopped, restart them", vgId);
|
||||||
|
|
|
@ -13,11 +13,11 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "vnd.h"
|
|
||||||
#include "tq.h"
|
#include "tq.h"
|
||||||
|
#include "vnd.h"
|
||||||
|
|
||||||
static int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle);
|
static int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle);
|
||||||
static int32_t doSetOffsetForWalReader(SStreamTask *pTask, int32_t vgId);
|
static int32_t doSetOffsetForWalReader(SStreamTask* pTask, int32_t vgId);
|
||||||
|
|
||||||
// this function should be executed by stream threads.
|
// this function should be executed by stream threads.
|
||||||
// extract submit block from WAL, and add them into the input queue for the sources tasks.
|
// extract submit block from WAL, and add them into the input queue for the sources tasks.
|
||||||
|
@ -167,12 +167,12 @@ int32_t tqStartStreamTasks(STQ* pTq) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t doSetOffsetForWalReader(SStreamTask *pTask, int32_t vgId) {
|
int32_t doSetOffsetForWalReader(SStreamTask* pTask, int32_t vgId) {
|
||||||
// seek the stored version and extract data from WAL
|
// seek the stored version and extract data from WAL
|
||||||
int64_t firstVer = walReaderGetValidFirstVer(pTask->exec.pWalReader);
|
int64_t firstVer = walReaderGetValidFirstVer(pTask->exec.pWalReader);
|
||||||
if (pTask->chkInfo.currentVer < firstVer) {
|
if (pTask->chkInfo.currentVer < firstVer) {
|
||||||
tqWarn("vgId:%d s-task:%s ver:%"PRId64" earlier than the first ver of wal range %" PRId64 ", forward to %" PRId64, vgId,
|
tqWarn("vgId:%d s-task:%s ver:%" PRId64 " earlier than the first ver of wal range %" PRId64 ", forward to %" PRId64,
|
||||||
pTask->id.idStr, pTask->chkInfo.currentVer, firstVer, firstVer);
|
vgId, pTask->id.idStr, pTask->chkInfo.currentVer, firstVer, firstVer);
|
||||||
|
|
||||||
pTask->chkInfo.currentVer = firstVer;
|
pTask->chkInfo.currentVer = firstVer;
|
||||||
|
|
||||||
|
@ -193,7 +193,8 @@ int32_t doSetOffsetForWalReader(SStreamTask *pTask, int32_t vgId) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// append the data for the stream
|
// append the data for the stream
|
||||||
tqDebug("vgId:%d s-task:%s wal reader initial seek to ver:%" PRId64, vgId, pTask->id.idStr, pTask->chkInfo.currentVer);
|
tqDebug("vgId:%d s-task:%s wal reader initial seek to ver:%" PRId64, vgId, pTask->id.idStr,
|
||||||
|
pTask->chkInfo.currentVer);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -223,7 +224,7 @@ static void checkForFillHistoryVerRange(SStreamTask* pTask, int64_t ver) {
|
||||||
double el = (taosGetTimestampMs() - pTask->tsInfo.step2Start) / 1000.0;
|
double el = (taosGetTimestampMs() - pTask->tsInfo.step2Start) / 1000.0;
|
||||||
qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, el);
|
qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, el);
|
||||||
appendTranstateIntoInputQ(pTask);
|
appendTranstateIntoInputQ(pTask);
|
||||||
/*int32_t code = */streamSchedExec(pTask);
|
/*int32_t code = */ streamSchedExec(pTask);
|
||||||
} else {
|
} else {
|
||||||
qWarn("s-task:%s fill-history scan WAL, currentVer:%" PRId64 " reach the maximum ver:%" PRId64 ", not scan wal",
|
qWarn("s-task:%s fill-history scan WAL, currentVer:%" PRId64 " reach the maximum ver:%" PRId64 ", not scan wal",
|
||||||
id, ver, maxVer);
|
id, ver, maxVer);
|
||||||
|
@ -278,7 +279,7 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
|
||||||
ASSERT(status == TASK_STATUS__NORMAL);
|
ASSERT(status == TASK_STATUS__NORMAL);
|
||||||
// the maximum version of data in the WAL has reached already, the step2 is done
|
// the maximum version of data in the WAL has reached already, the step2 is done
|
||||||
tqDebug("s-task:%s fill-history reach the maximum ver:%" PRId64 ", not scan wal anymore", pTask->id.idStr,
|
tqDebug("s-task:%s fill-history reach the maximum ver:%" PRId64 ", not scan wal anymore", pTask->id.idStr,
|
||||||
pTask->dataRange.range.maxVer);
|
pTask->dataRange.range.maxVer);
|
||||||
streamMetaReleaseTask(pStreamMeta, pTask);
|
streamMetaReleaseTask(pStreamMeta, pTask);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -306,10 +307,10 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t numOfItems = streamTaskGetInputQItems(pTask);
|
int32_t numOfItems = streamTaskGetInputQItems(pTask);
|
||||||
int64_t maxVer = (pTask->info.fillHistory == 1)? pTask->dataRange.range.maxVer:INT64_MAX;
|
int64_t maxVer = (pTask->info.fillHistory == 1) ? pTask->dataRange.range.maxVer : INT64_MAX;
|
||||||
|
|
||||||
SStreamQueueItem* pItem = NULL;
|
SStreamQueueItem* pItem = NULL;
|
||||||
code = extractMsgFromWal(pTask->exec.pWalReader, (void**) &pItem, maxVer, pTask->id.idStr);
|
code = extractMsgFromWal(pTask->exec.pWalReader, (void**)&pItem, maxVer, pTask->id.idStr);
|
||||||
|
|
||||||
if ((code != TSDB_CODE_SUCCESS || pItem == NULL) && (numOfItems == 0)) { // failed, continue
|
if ((code != TSDB_CODE_SUCCESS || pItem == NULL) && (numOfItems == 0)) { // failed, continue
|
||||||
checkForFillHistoryVerRange(pTask, walReaderGetCurrentVer(pTask->exec.pWalReader));
|
checkForFillHistoryVerRange(pTask, walReaderGetCurrentVer(pTask->exec.pWalReader));
|
||||||
|
@ -321,6 +322,7 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
|
||||||
|
|
||||||
if (pTask->status.taskStatus != TASK_STATUS__NORMAL) {
|
if (pTask->status.taskStatus != TASK_STATUS__NORMAL) {
|
||||||
tqDebug("s-task:%s not ready for submit block from wal, status:%s", pTask->id.idStr, pStatus);
|
tqDebug("s-task:%s not ready for submit block from wal, status:%s", pTask->id.idStr, pStatus);
|
||||||
|
streamMetaReleaseTask(pStreamMeta, pTask);
|
||||||
taosThreadMutexUnlock(&pTask->lock);
|
taosThreadMutexUnlock(&pTask->lock);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -360,4 +362,3 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
|
||||||
taosArrayDestroy(pTaskList);
|
taosArrayDestroy(pTaskList);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -439,7 +439,7 @@ int32_t rebuildDirFromCheckpoint(const char* path, int64_t chkpId, char** dst) {
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
qError("failed to restart stream backend from %s, reason: %s", chkp, tstrerror(TAOS_SYSTEM_ERROR(errno)));
|
qError("failed to restart stream backend from %s, reason: %s", chkp, tstrerror(TAOS_SYSTEM_ERROR(errno)));
|
||||||
} else {
|
} else {
|
||||||
qInfo("succ to restart stream backend at checkpoint path: %s", chkp);
|
qInfo("start to restart stream backend at checkpoint path: %s", chkp);
|
||||||
}
|
}
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
|
@ -510,7 +510,11 @@ void* streamBackendInit(const char* streamPath, int64_t chkpId) {
|
||||||
/*
|
/*
|
||||||
list all cf and get prefix
|
list all cf and get prefix
|
||||||
*/
|
*/
|
||||||
streamStateOpenBackendCf(pHandle, (char*)backendPath, cfs, nCf);
|
code = streamStateOpenBackendCf(pHandle, (char*)backendPath, cfs, nCf);
|
||||||
|
if (code != 0) {
|
||||||
|
rocksdb_list_column_families_destroy(cfs, nCf);
|
||||||
|
goto _EXIT;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if (cfs != NULL) {
|
if (cfs != NULL) {
|
||||||
rocksdb_list_column_families_destroy(cfs, nCf);
|
rocksdb_list_column_families_destroy(cfs, nCf);
|
||||||
|
@ -545,16 +549,6 @@ void streamBackendCleanup(void* arg) {
|
||||||
taosHashCleanup(pHandle->cfInst);
|
taosHashCleanup(pHandle->cfInst);
|
||||||
|
|
||||||
if (pHandle->db) {
|
if (pHandle->db) {
|
||||||
// char* err = NULL;
|
|
||||||
// rocksdb_flushoptions_t* flushOpt = rocksdb_flushoptions_create();
|
|
||||||
// rocksdb_flushoptions_set_wait(flushOpt, 1);
|
|
||||||
// rocksdb_flush(pHandle->db, flushOpt, &err);
|
|
||||||
|
|
||||||
// if (err != NULL) {
|
|
||||||
// qError("failed to flush db before streamBackend clean up, reason:%s", err);
|
|
||||||
// taosMemoryFree(err);
|
|
||||||
// }
|
|
||||||
// rocksdb_flushoptions_destroy(flushOpt);
|
|
||||||
rocksdb_close(pHandle->db);
|
rocksdb_close(pHandle->db);
|
||||||
}
|
}
|
||||||
rocksdb_options_destroy(pHandle->dbOpt);
|
rocksdb_options_destroy(pHandle->dbOpt);
|
||||||
|
@ -1480,6 +1474,12 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t
|
||||||
if (err != NULL) {
|
if (err != NULL) {
|
||||||
qError("failed to open rocksdb cf, reason:%s", err);
|
qError("failed to open rocksdb cf, reason:%s", err);
|
||||||
taosMemoryFree(err);
|
taosMemoryFree(err);
|
||||||
|
taosMemoryFree(cfHandle);
|
||||||
|
taosMemoryFree(pCompare);
|
||||||
|
taosMemoryFree(params);
|
||||||
|
taosMemoryFree(cfOpts);
|
||||||
|
// fix other leak
|
||||||
|
return -1;
|
||||||
} else {
|
} else {
|
||||||
qDebug("succ to open rocksdb cf");
|
qDebug("succ to open rocksdb cf");
|
||||||
}
|
}
|
||||||
|
@ -2851,7 +2851,7 @@ int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch) {
|
||||||
taosMemoryFree(err);
|
taosMemoryFree(err);
|
||||||
return -1;
|
return -1;
|
||||||
} else {
|
} else {
|
||||||
qDebug("write batch to backend opt: %p", wrapper->pBackend);
|
qDebug("write batch to backend:%p", wrapper->pBackend);
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -107,8 +107,10 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
|
||||||
pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId);
|
pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId);
|
||||||
if (pMeta->streamBackend == NULL) {
|
if (pMeta->streamBackend == NULL) {
|
||||||
qError("vgId:%d failed to init stream backend", pMeta->vgId);
|
qError("vgId:%d failed to init stream backend", pMeta->vgId);
|
||||||
|
qInfo("vgId:%d retry to init stream backend", pMeta->vgId);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// if (pMeta->streamBackend == NULL) {
|
// if (pMeta->streamBackend == NULL) {
|
||||||
// goto _err;
|
// goto _err;
|
||||||
// }
|
// }
|
||||||
|
@ -172,6 +174,7 @@ int32_t streamMetaReopen(SStreamMeta* pMeta, int64_t chkpId) {
|
||||||
pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId);
|
pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId);
|
||||||
if (pMeta->streamBackend == NULL) {
|
if (pMeta->streamBackend == NULL) {
|
||||||
qError("vgId:%d failed to init stream backend", pMeta->vgId);
|
qError("vgId:%d failed to init stream backend", pMeta->vgId);
|
||||||
|
qInfo("vgId:%d retry to init stream backend", pMeta->vgId);
|
||||||
// return -1;
|
// return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue