Merge pull request #22242 from taosdata/fix/liaohj

other: merge 3.0
This commit is contained in:
Haojun Liao 2023-07-29 14:54:32 +08:00 committed by GitHub
commit 6540180c90
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 370 additions and 218 deletions

View File

@ -45,7 +45,6 @@ enum {
TASK_STATUS__FAIL,
TASK_STATUS__STOP,
TASK_STATUS__SCAN_HISTORY, // stream task scan history data by using tsdbread in the stream scanner
TASK_STATUS__SCAN_HISTORY_WAL, // scan history data in wal
TASK_STATUS__HALT, // pause, but not be manipulated by user command
TASK_STATUS__PAUSE, // pause
};

View File

@ -42,7 +42,7 @@
static SDnode globalDnode = {0};
static const char *dmOS[10] = {"Ubuntu", "CentOS Linux", "Red Hat", "Debian GNU", "CoreOS",
"FreeBSD", "openSUSE", "SLES", "Fedora", "MacOS"};
"FreeBSD", "openSUSE", "SLES", "Fedora", "macOS"};
SDnode *dmInstance() { return &globalDnode; }

View File

@ -1145,7 +1145,6 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
// 3. It's an fill history task, do nothing. wait for the main task to start it
SStreamTask* p = streamMetaAcquireTask(pStreamMeta, taskId);
if (p != NULL) { // reset the downstreamReady flag.
p->status.downstreamReady = 0;
streamTaskCheckDownstreamTasks(p);
}
@ -1154,12 +1153,10 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
}
int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
int32_t code = TSDB_CODE_SUCCESS;
char* msg = pMsg->pCont;
SStreamScanHistoryReq* pReq = (SStreamScanHistoryReq*)pMsg->pCont;
SStreamMeta* pMeta = pTq->pStreamMeta;
SStreamScanHistoryReq* pReq = (SStreamScanHistoryReq*)msg;
int32_t code = TSDB_CODE_SUCCESS;
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->taskId);
if (pTask == NULL) {
tqError("vgId:%d failed to acquire stream task:0x%x during stream recover, task may have been destroyed",
@ -1170,9 +1167,17 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
// do recovery step1
const char* id = pTask->id.idStr;
const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus);
tqDebug("s-task:%s start history data scan stage(step 1), status:%s", id, pStatus);
tqDebug("s-task:%s start scan-history stage(step 1), status:%s", id, pStatus);
int64_t st = taosGetTimestampMs();
if (pTask->tsInfo.step1Start == 0) {
ASSERT(pTask->status.pauseAllowed == false);
pTask->tsInfo.step1Start = taosGetTimestampMs();
if (pTask->info.fillHistory == 1) {
streamTaskEnablePause(pTask);
}
} else {
tqDebug("s-task:%s resume from paused, start ts:%"PRId64, pTask->id.idStr, pTask->tsInfo.step1Start);
}
// we have to continue retrying to successfully execute the scan history task.
int8_t schedStatus = atomic_val_compare_exchange_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE,
@ -1185,31 +1190,21 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
return 0;
}
ASSERT(pTask->status.pauseAllowed == false);
if (pTask->info.fillHistory == 1) {
streamTaskEnablePause(pTask);
ASSERT(pTask->status.pauseAllowed == true);
}
if (!streamTaskRecoverScanStep1Finished(pTask)) {
streamSourceScanHistoryData(pTask);
}
// disable the pause when handling the step2 scan of tsdb data.
// the whole next procedure cann't be stopped.
// todo fix it: the following procedure should be executed completed and then shutdown when trying to close vnode.
if (pTask->info.fillHistory == 1) {
streamTaskDisablePause(pTask);
}
if (streamTaskShouldStop(&pTask->status) || streamTaskShouldPause(&pTask->status)) {
tqDebug("s-task:%s is dropped or paused, abort recover in step1", id);
if (pTask->status.taskStatus == TASK_STATUS__PAUSE) {
double el = (taosGetTimestampMs() - pTask->tsInfo.step1Start) / 1000.0;
tqDebug("s-task:%s is paused in the step1, elapsed time:%.2fs, sched-status:%d", pTask->id.idStr, el,
TASK_SCHED_STATUS__INACTIVE);
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
streamMetaReleaseTask(pMeta, pTask);
return 0;
}
double el = (taosGetTimestampMs() - st) / 1000.0;
// the following procedure should be executed, no matter status is stop/pause or not
double el = (taosGetTimestampMs() - pTask->tsInfo.step1Start) / 1000.0;
tqDebug("s-task:%s scan-history stage(step 1) ended, elapsed time:%.2fs", id, el);
if (pTask->info.fillHistory) {
@ -1217,25 +1212,24 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
SStreamTask* pStreamTask = NULL;
bool done = false;
if (!pReq->igUntreated && !streamTaskRecoverScanStep1Finished(pTask)) {
// 1. stop the related stream task, get the current scan wal version of stream task, ver.
// 1. get the related stream task
pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.taskId);
if (pStreamTask == NULL) {
// todo delete this task, if the related stream task is dropped
qError("failed to find s-task:0x%x, it may have been destroyed, drop fill-history task:%s",
pTask->streamTaskId.taskId, pTask->id.idStr);
pTask->status.taskStatus = TASK_STATUS__DROPPING;
tqDebug("s-task:%s fill-history task set status to be dropping", id);
streamMetaSaveTask(pMeta, pTask);
streamMetaUnregisterTask(pMeta, pTask->id.taskId);
streamMetaReleaseTask(pMeta, pTask);
return -1;
}
ASSERT(pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE);
// stream task in TASK_STATUS__SCAN_HISTORY can not be paused.
// wait for the stream task get ready for scan history data
// 2. it cannot be paused, when the stream task in TASK_STATUS__SCAN_HISTORY status. Let's wait for the
// stream task get ready for scan history data
while (pStreamTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) {
tqDebug(
"s-task:%s level:%d related stream task:%s(status:%s) not ready for halt, wait for it and recheck in 100ms",
@ -1245,6 +1239,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
// now we can stop the stream task execution
streamTaskHalt(pStreamTask);
tqDebug("s-task:%s level:%d sched-status:%d is halt by fill-history task:%s", pStreamTask->id.idStr,
pStreamTask->info.taskLevel, pStreamTask->status.schedStatus, id);
@ -1252,42 +1247,36 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
pRange = &pTask->dataRange.range;
int64_t latestVer = walReaderGetCurrentVer(pStreamTask->exec.pWalReader);
done = streamHistoryTaskSetVerRangeStep2(pTask, latestVer);
}
if (done) {
pTask->tsInfo.step2Start = taosGetTimestampMs();
streamTaskEndScanWAL(pTask);
streamMetaReleaseTask(pMeta, pTask);
} else {
if (!streamTaskRecoverScanStep1Finished(pTask)) {
STimeWindow* pWindow = &pTask->dataRange.window;
tqDebug("s-task:%s level:%d verRange:%" PRId64 " - %" PRId64 " window:%" PRId64 "-%" PRId64
", do secondary scan-history from WAL after halt the related stream task:%s",
id, pTask->info.taskLevel, pRange->minVer, pRange->maxVer, pWindow->skey, pWindow->ekey, id);
id, pTask->info.taskLevel, pRange->minVer, pRange->maxVer, pWindow->skey, pWindow->ekey,
pStreamTask->id.idStr);
ASSERT(pTask->status.schedStatus == TASK_SCHED_STATUS__WAITING);
pTask->tsInfo.step2Start = taosGetTimestampMs();
streamSetParamForStreamScannerStep2(pTask, pRange, pWindow);
}
if (!streamTaskRecoverScanStep2Finished(pTask)) {
pTask->status.taskStatus = TASK_STATUS__SCAN_HISTORY_WAL;
if (streamTaskShouldStop(&pTask->status) || streamTaskShouldPause(&pTask->status)) {
tqDebug("s-task:%s is dropped or paused, abort recover in step1", id);
streamMetaReleaseTask(pMeta, pTask);
return 0;
}
int64_t dstVer = pTask->dataRange.range.minVer - 1;
pTask->chkInfo.currentVer = dstVer;
walReaderSetSkipToVersion(pTask->exec.pWalReader, dstVer);
tqDebug("s-task:%s wal reader start scan from WAL ver:%" PRId64 ", set sched-status:%d", id, dstVer,
TASK_SCHED_STATUS__INACTIVE);
}
tqDebug("s-task:%s wal reader start scan WAL verRange:%" PRId64 "-%" PRId64 ", set sched-status:%d", id, dstVer,
pTask->dataRange.range.maxVer, TASK_SCHED_STATUS__INACTIVE);
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
// set the fill-history task to be normal
if (pTask->info.fillHistory == 1) {
streamSetStatusNormal(pTask);
}
// 4. 1) transfer the ownership of executor state, 2) update the scan data range for source task.
// 5. resume the related stream task.
streamMetaReleaseTask(pMeta, pTask);
@ -1304,7 +1293,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
if (pTask->historyTaskId.taskId == 0) {
*pWindow = (STimeWindow){INT64_MIN, INT64_MAX};
tqDebug(
"s-task:%s scanhistory in stream time window completed, no related fill-history task, reset the time "
"s-task:%s scan-history in stream time window completed, no related fill-history task, reset the time "
"window:%" PRId64 " - %" PRId64,
id, pWindow->skey, pWindow->ekey);
qResetStreamInfoTimeWindow(pTask->exec.pExecutor);
@ -1500,7 +1489,7 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
if (pTask != NULL) {
// even in halt status, the data in inputQ must be processed
int8_t st = pTask->status.taskStatus;
if (st == TASK_STATUS__NORMAL || st == TASK_STATUS__SCAN_HISTORY || st == TASK_STATUS__SCAN_HISTORY_WAL) {
if (st == TASK_STATUS__NORMAL || st == TASK_STATUS__SCAN_HISTORY/* || st == TASK_STATUS__SCAN_HISTORY_WAL*/) {
tqDebug("vgId:%d s-task:%s start to process block from inputQ, last chk point:%" PRId64, vgId, pTask->id.idStr,
pTask->chkInfo.version);
streamProcessRunReq(pTask);
@ -1637,7 +1626,7 @@ int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion,
vgId, pTask->id.idStr, pTask->chkInfo.currentVer, sversion, pTask->status.schedStatus);
}
if (level == TASK_LEVEL__SOURCE && pTask->info.fillHistory) {
if (level == TASK_LEVEL__SOURCE && pTask->info.fillHistory && pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) {
streamStartRecoverTask(pTask, igUntreated);
} else if (level == TASK_LEVEL__SOURCE && (taosQueueItemSize(pTask->inputQueue->queue) == 0)) {
tqStartStreamTasks(pTq);

View File

@ -256,14 +256,15 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
continue;
}
if (status != TASK_STATUS__NORMAL && status != TASK_STATUS__SCAN_HISTORY_WAL) {
if (status != TASK_STATUS__NORMAL/* && status != TASK_STATUS__SCAN_HISTORY_WAL*/) {
tqDebug("s-task:%s not ready for new submit block from wal, status:%s", pTask->id.idStr, streamGetTaskStatusStr(status));
streamMetaReleaseTask(pStreamMeta, pTask);
continue;
}
if ((pTask->info.fillHistory == 1) && pTask->status.transferState) {
ASSERT(status == TASK_STATUS__SCAN_HISTORY_WAL);
// ASSERT(status == TASK_STATUS__SCAN_HISTORY_WAL);
ASSERT(status == TASK_STATUS__NORMAL);
// the maximum version of data in the WAL has reached already, the step2 is done
tqDebug("s-task:%s fill-history reach the maximum ver:%" PRId64 ", not scan wal anymore", pTask->id.idStr,
pTask->dataRange.range.maxVer);

View File

@ -1028,55 +1028,7 @@ int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCache
return code;
}
/*
int32_t tsdbCacheGet(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr, int8_t ltype) {
int32_t code = 0;
SLRUCache *pCache = pTsdb->lruCache;
SArray *pCidList = pr->pCidList;
int num_keys = TARRAY_SIZE(pCidList);
for (int i = 0; i < num_keys; ++i) {
SLastCol *pLastCol = NULL;
int16_t cid = *(int16_t *)taosArrayGet(pCidList, i);
SLastKey *key = &(SLastKey){.ltype = ltype, .uid = uid, .cid = cid};
LRUHandle *h = taosLRUCacheLookup(pCache, key, ROCKS_KEY_LEN);
if (!h) {
taosThreadMutexLock(&pTsdb->lruMutex);
h = taosLRUCacheLookup(pCache, key, ROCKS_KEY_LEN);
if (!h) {
pLastCol = tsdbCacheLoadCol(pTsdb, pr, pr->pSlotIds[i], uid, cid, ltype);
size_t charge = sizeof(*pLastCol);
if (IS_VAR_DATA_TYPE(pLastCol->colVal.type)) {
charge += pLastCol->colVal.value.nData;
}
LRUStatus status = taosLRUCacheInsert(pCache, key, ROCKS_KEY_LEN, pLastCol, charge, tsdbCacheDeleter, &h,
TAOS_LRU_PRIORITY_LOW, &pTsdb->flushState);
if (status != TAOS_LRU_STATUS_OK) {
code = -1;
}
}
taosThreadMutexUnlock(&pTsdb->lruMutex);
}
pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h);
SLastCol lastCol = *pLastCol;
reallocVarData(&lastCol.colVal);
if (h) {
taosLRUCacheRelease(pCache, h, false);
}
taosArrayPush(pLastArray, &lastCol);
}
return code;
}
*/
int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey) {
int32_t code = 0;
// fetch schema
@ -1108,6 +1060,7 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE
char **values_list = taosMemoryCalloc(num_keys * 2, sizeof(char *));
size_t *values_list_sizes = taosMemoryCalloc(num_keys * 2, sizeof(size_t));
char **errs = taosMemoryCalloc(num_keys * 2, sizeof(char *));
taosThreadMutexLock(&pTsdb->lruMutex);
taosThreadMutexLock(&pTsdb->rCache.rMutex);
rocksMayWrite(pTsdb, true, false, false);
rocksdb_multi_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, num_keys * 2, (const char *const *)keys_list,
@ -1137,7 +1090,7 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE
rocksdb_free(values_list[i]);
rocksdb_free(values_list[i + num_keys]);
taosThreadMutexLock(&pTsdb->lruMutex);
// taosThreadMutexLock(&pTsdb->lruMutex);
LRUHandle *h = taosLRUCacheLookup(pTsdb->lruCache, keys_list[i], klen);
if (h) {
@ -1159,7 +1112,7 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE
}
taosLRUCacheErase(pTsdb->lruCache, keys_list[num_keys + i], klen);
taosThreadMutexUnlock(&pTsdb->lruMutex);
// taosThreadMutexUnlock(&pTsdb->lruMutex);
}
for (int i = 0; i < num_keys; ++i) {
taosMemoryFree(keys_list[i]);
@ -1171,6 +1124,8 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE
rocksMayWrite(pTsdb, true, false, true);
taosThreadMutexUnlock(&pTsdb->lruMutex);
_exit:
taosMemoryFree(pTSchema);
@ -1311,62 +1266,7 @@ int32_t tsdbCacheDeleteLast(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey) {
return code;
}
/*
int32_t tsdbCacheDelete(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey) {
int32_t code = 0;
char key[32] = {0};
int keyLen = 0;
// getTableCacheKey(uid, "lr", key, &keyLen);
getTableCacheKey(uid, 0, key, &keyLen);
LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen);
if (h) {
SArray *pLast = (SArray *)taosLRUCacheValue(pCache, h);
bool invalidate = false;
int16_t nCol = taosArrayGetSize(pLast);
for (int16_t iCol = 0; iCol < nCol; ++iCol) {
SLastCol *tTsVal = (SLastCol *)taosArrayGet(pLast, iCol);
if (eKey >= tTsVal->ts) {
invalidate = true;
break;
}
}
if (invalidate) {
taosLRUCacheRelease(pCache, h, true);
} else {
taosLRUCacheRelease(pCache, h, false);
}
}
// getTableCacheKey(uid, "l", key, &keyLen);
getTableCacheKey(uid, 1, key, &keyLen);
h = taosLRUCacheLookup(pCache, key, keyLen);
if (h) {
SArray *pLast = (SArray *)taosLRUCacheValue(pCache, h);
bool invalidate = false;
int16_t nCol = taosArrayGetSize(pLast);
for (int16_t iCol = 0; iCol < nCol; ++iCol) {
SLastCol *tTsVal = (SLastCol *)taosArrayGet(pLast, iCol);
if (eKey >= tTsVal->ts) {
invalidate = true;
break;
}
}
if (invalidate) {
taosLRUCacheRelease(pCache, h, true);
} else {
taosLRUCacheRelease(pCache, h, false);
}
// void taosLRUCacheErase(SLRUCache * cache, const void *key, size_t keyLen);
}
return code;
}
*/
int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, STsdb *pTsdb, tb_uid_t uid, TSDBROW *row, bool dup) {
int32_t code = 0;
STSRow *cacheRow = NULL;
@ -1767,6 +1667,10 @@ static int32_t loadTombFromBlk(const TTombBlkArray *pTombBlkArray, SCacheRowsRea
}
if (record.version <= pReader->info.verRange.maxVer) {
/*
tsdbError("tomb xx load/cache: vgId:%d fid:%d commit %" PRId64 "~%" PRId64 "~%" PRId64 " tomb records",
TD_VID(pReader->pTsdb->pVnode), pReader->pCurFileSet->fid, record.skey, record.ekey, uid);
*/
SDelData delData = {.version = record.version, .sKey = record.skey, .eKey = record.ekey};
taosArrayPush(pInfo->pTombData, &delData);
}
@ -1977,9 +1881,9 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie
goto _err;
}
loadDataTomb(state->pr, state->pr->pFileReader);
state->pr->pCurFileSet = state->pFileSet;
loadDataTomb(state->pr, state->pr->pFileReader);
}
if (!state->pIndexList) {
@ -2017,6 +1921,10 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie
state->iBrinIndex = indexSize;
}
if (state->pFileSet != state->pr->pCurFileSet) {
state->pr->pCurFileSet = state->pFileSet;
}
code = lastIterOpen(&state->lastIter, state->pFileSet, state->pTsdb, state->pTSchema, state->suid, state->uid,
state->pr, state->lastTs, aCols, nCols);
if (code != TSDB_CODE_SUCCESS) {

View File

@ -2121,8 +2121,7 @@ FETCH_NEXT_BLOCK:
return pInfo->pUpdateRes;
}
SSDataBlock* pBlock = pInfo->pRes;
SDataBlockInfo* pBlockInfo = &pBlock->info;
SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;
int32_t totalBlocks = taosArrayGetSize(pInfo->pBlockLists);
NEXT_SUBMIT_BLK:
@ -2146,35 +2145,36 @@ FETCH_NEXT_BLOCK:
}
}
blockDataCleanup(pBlock);
blockDataCleanup(pInfo->pRes);
while (pAPI->tqReaderFn.tqNextBlockImpl(pInfo->tqReader, id)) {
SSDataBlock* pRes = NULL;
int32_t code = pAPI->tqReaderFn.tqRetrieveBlock(pInfo->tqReader, &pRes, id);
qDebug("retrieve data from submit completed code:%s, rows:%" PRId64 " %s", tstrerror(code), pRes->info.rows,
id);
qDebug("retrieve data from submit completed code:%s rows:%" PRId64 " %s", tstrerror(code), pRes->info.rows, id);
if (code != TSDB_CODE_SUCCESS || pRes->info.rows == 0) {
qDebug("retrieve data failed, try next block in submit block, %s", id);
continue;
}
setBlockIntoRes(pInfo, pRes, false);
// filter the block extracted from WAL files, according to the time window
// apply additional time window filter
doBlockDataWindowFilter(pRes, pInfo->primaryTsIndex, &pStreamInfo->fillHistoryWindow, id);
blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
if (pRes->info.rows == 0) {
continue;
}
setBlockIntoRes(pInfo, pRes, false);
if (pInfo->pCreateTbRes->info.rows > 0) {
pInfo->scanMode = STREAM_SCAN_FROM_RES;
qDebug("create table res exists, rows:%"PRId64" return from stream scan, %s", pInfo->pCreateTbRes->info.rows, id);
return pInfo->pCreateTbRes;
}
// apply additional time window filter
doBlockDataWindowFilter(pBlock, pInfo->primaryTsIndex, &pStreamInfo->fillHistoryWindow, id);
pBlock->info.dataLoad = 1;
blockDataUpdateTsWindow(pBlock, pInfo->primaryTsIndex);
doCheckUpdate(pInfo, pBlockInfo->window.ekey, pBlock);
doFilter(pBlock, pOperator->exprSupp.pFilterInfo, NULL);
doCheckUpdate(pInfo, pBlockInfo->window.ekey, pInfo->pRes);
doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
int64_t numOfUpdateRes = pInfo->pUpdateDataRes->info.rows;
qDebug("%s %" PRId64 " rows in datablock, update res:%" PRId64, id, pBlockInfo->rows, numOfUpdateRes);
@ -2196,7 +2196,7 @@ FETCH_NEXT_BLOCK:
qDebug("stream scan completed, and return source rows:%" PRId64", %s", pBlockInfo->rows, id);
if (pBlockInfo->rows > 0) {
return pBlock;
return pInfo->pRes;
}
if (pInfo->pUpdateDataRes->info.rows > 0) {
@ -2779,7 +2779,7 @@ _error:
return NULL;
}
static SSDataBlock* getTableDataBlockImpl(void* param) {
static SSDataBlock* getBlockForTableMergeScan(void* param) {
STableMergeScanSortSourceParam* source = param;
SOperatorInfo* pOperator = source->pOperator;
STableMergeScanInfo* pInfo = pOperator->info;
@ -2797,6 +2797,7 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
code = pAPI->tsdReader.tsdNextDataBlock(reader, &hasNext);
if (code != 0) {
pAPI->tsdReader.tsdReaderReleaseDataBlock(reader);
qError("table merge scan fetch next data block error code: %d, %s", code, GET_TASKID(pTaskInfo));
T_LONG_JMP(pTaskInfo->env, code);
}
@ -2805,8 +2806,9 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
}
if (isTaskKilled(pTaskInfo)) {
qInfo("table merge scan fetch next data block found task killed. %s", GET_TASKID(pTaskInfo));
pAPI->tsdReader.tsdReaderReleaseDataBlock(reader);
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
break;
}
// process this data block based on the probabilities
@ -2819,6 +2821,7 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
code = loadDataBlock(pOperator, &pInfo->base, pBlock, &status);
// code = loadDataBlockFromOneTable(pOperator, pTableScanInfo, pBlock, &status);
if (code != TSDB_CODE_SUCCESS) {
qInfo("table merge scan load datablock code %d, %s", code, GET_TASKID(pTaskInfo));
T_LONG_JMP(pTaskInfo->env, code);
}
@ -2909,7 +2912,8 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
tsortSetMergeLimit(pInfo->pSortHandle, mergeLimit);
}
tsortSetFetchRawDataFp(pInfo->pSortHandle, getTableDataBlockImpl, NULL, NULL);
tsortSetFetchRawDataFp(pInfo->pSortHandle, getBlockForTableMergeScan, NULL, NULL);
// one table has one data block
int32_t numOfTable = tableEndIdx - tableStartIdx + 1;

View File

@ -1049,12 +1049,24 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
}
if (pBlk == NULL) {
break;
};
}
if (tsortIsClosed(pHandle)) {
tSimpleHashClear(mUidBlk);
for (int i = 0; i < taosArrayGetSize(aBlkSort); ++i) {
blockDataDestroy(taosArrayGetP(aBlkSort, i));
}
taosArrayClear(aBlkSort);
break;
}
}
tSimpleHashCleanup(mUidBlk);
taosArrayDestroy(aBlkSort);
tsortClearOrderdSource(pHandle->pOrderedSource, NULL, NULL);
if (!tsortIsClosed(pHandle)) {
taosArrayAddAll(pHandle->pOrderedSource, aExtSrc);
}
taosArrayDestroy(aExtSrc);
pHandle->type = SORT_SINGLESOURCE_SORT;

View File

@ -172,6 +172,12 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
bool finished = false;
while (1) {
if (streamTaskShouldPause(&pTask->status)) {
double el = (taosGetTimestampMs() - pTask->tsInfo.step1Start) / 1000.0;
qDebug("s-task:%s paused from the scan-history task, elapsed time:%.2fsec", pTask->id.idStr, el);
return 0;
}
SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
if (pRes == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
@ -404,6 +410,8 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
streamTaskReleaseState(pTask);
streamTaskReloadState(pStreamTask);
// clear the link between fill-history task and stream task info
pStreamTask->historyTaskId.taskId = 0;
streamTaskResumeFromHalt(pStreamTask);
qDebug("s-task:%s fill-history task set status to be dropping, save the state into disk", pTask->id.idStr);
@ -414,6 +422,7 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
// save to disk
taosWLockLatch(&pMeta->lock);
streamMetaSaveTask(pMeta, pStreamTask);
if (streamMetaCommit(pMeta) < 0) {
// persist to disk
@ -615,7 +624,7 @@ int32_t streamTryExec(SStreamTask* pTask) {
// todo the task should be commit here
if (taosQueueEmpty(pTask->inputQueue->queue)) {
// fill-history WAL scan has completed
if (pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY_WAL && pTask->status.transferState == true) {
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE && pTask->status.transferState == true) {
streamTaskRecoverSetAllStepFinished(pTask);
streamTaskEndScanWAL(pTask);
} else {

View File

@ -85,6 +85,7 @@ int32_t streamTaskLaunchScanHistory(SStreamTask* pTask) {
if (pTask->info.fillHistory) {
streamSetParamForScanHistory(pTask);
}
streamTaskEnablePause(pTask);
streamTaskScanHistoryPrepare(pTask);
} else if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
qDebug("s-task:%s sink task do nothing to handle scan-history", pTask->id.idStr);
@ -856,8 +857,19 @@ void streamTaskPause(SStreamTask* pTask) {
taosMsleep(100);
}
// todo: use the lock of the task.
taosWLockLatch(&pMeta->lock);
status = pTask->status.taskStatus;
if (status == TASK_STATUS__DROPPING || status == TASK_STATUS__STOP) {
taosWUnLockLatch(&pMeta->lock);
qDebug("vgId:%d s-task:%s task already dropped/stopped/paused, do nothing", pMeta->vgId, pTask->id.idStr);
return;
}
atomic_store_8(&pTask->status.keepTaskStatus, pTask->status.taskStatus);
atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__PAUSE);
taosWUnLockLatch(&pMeta->lock);
int64_t el = taosGetTimestampMs() - st;
qDebug("vgId:%d s-task:%s set pause flag, prev:%s, elapsed time:%dms", pMeta->vgId, pTask->id.idStr,

View File

@ -0,0 +1,104 @@
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
#
# The option for wal_retetion_period and wal_retention_size is work well
#
import taos
from taos.tmq import Consumer
import os
import sys
import threading
import json
import time
import random
from datetime import date
from datetime import datetime
from datetime import timedelta
from os import path
topicName = "topic"
topicNum = 100
# consume topic
def consume_topic(topic_name, group,consume_cnt, index, wait):
consumer = Consumer(
{
"group.id": group,
"td.connect.user": "root",
"td.connect.pass": "taosdata",
"enable.auto.commit": "true",
}
)
print(f"start consumer topic:{topic_name} group={group} index={index} ...")
consumer.subscribe([topic_name])
cnt = 0
try:
while True and cnt < consume_cnt:
res = consumer.poll(1)
if not res:
if wait:
continue
else:
break
err = res.error()
if err is not None:
raise err
val = res.value()
cnt += 1
print(f" consume {cnt} ")
for block in val:
datas = block.fetchall()
data = datas[0][:50]
print(f" {topic_name}_{group}_{index} {cnt} {data}")
finally:
consumer.unsubscribe()
consumer.close()
def consumerThread(index):
global topicName, topicNum
print(f' thread {index} start...')
while True:
idx = random.randint(0, topicNum - 1)
name = f"{topicName}{idx}"
group = f"group_{index}_{idx}"
consume_topic(name, group, 100, index, True)
if __name__ == "__main__":
print(sys.argv)
threadCnt = 10
if len(sys.argv) == 1:
threadCnt = int(sys.argv[1])
threads = []
print(f'consumer with {threadCnt} threads...')
for i in range(threadCnt):
x = threading.Thread(target=consumerThread, args=(i,))
x.start()
threads.append(x)
# wait
for i, thread in enumerate(threads):
thread.join()
print(f'join thread {i} end.')

View File

@ -0,0 +1,114 @@
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import os
import sys
import random
import time
from util.log import *
from util.cases import *
from util.sql import *
from util.common import *
from util.sqlset import *
class TDTestCase:
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor())
self.setsql = TDSetSql()
# prepareEnv
def prepareEnv(self):
self.dbName = "mullevel"
self.stbName = "meters"
self.topicName = "topic"
self.topicNum = 100
self.loop = 50000
sql = f"use {self.dbName}"
tdSql.execute(sql)
# generate topic sql
self.sqls = [
f"select * from {self.stbName}",
f"select * from {self.stbName} where ui < 200",
f"select * from {self.stbName} where fc > 20.1",
f"select * from {self.stbName} where nch like '%%a%%'",
f"select * from {self.stbName} where fc > 20.1",
f"select lower(bin) from {self.stbName} where length(bin) < 10;",
f"select upper(bin) from {self.stbName} where length(nch) > 10;",
f"select upper(bin) from {self.stbName} where ti > 10 or ic < 40;",
f"select * from {self.stbName} where ic < 100 "
]
# prepareEnv
def createTopics(self):
for i in range(self.topicNum):
topicName = f"{self.topicName}{i}"
sql = random.choice(self.sqls)
createSql = f"create topic if not exists {topicName} as {sql}"
try:
tdSql.execute(createSql, 3, True)
except:
tdLog.info(f" create topic {topicName} failed.")
# random del topic
def managerTopics(self):
for i in range(self.loop):
tdLog.info(f"start modify loop={i}")
idx = random.randint(0, self.topicNum - 1)
# delete
topicName = f"{self.topicName}{idx}"
sql = f"drop topic if exist {topicName}"
try:
tdSql.execute(sql, 3, True)
except:
tdLog.info(f" drop topic {topicName} failed.")
# create topic
sql = random.choice(self.sqls)
createSql = f"create topic if not exists {topicName} as {sql}"
try:
tdSql.execute(createSql, 3, True)
except:
tdLog.info(f" create topic {topicName} failed.")
seconds = [0.1, 0.5, 3, 2.5, 1.5, 0.4, 5.2, 2.6, 0.4, 0.2]
time.sleep(random.choice(seconds))
# run
def run(self):
# prepare env
self.prepareEnv()
# create topic
self.createTopics()
# modify topic
self.managerTopics()
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())