Merge pull request #15595 from taosdata/feature/stream

enh(wal): add lock to guarantee read behaviour
This commit is contained in:
Liu Jicong 2022-07-30 17:09:20 +08:00 committed by GitHub
commit c67f5f10a2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 26 additions and 22 deletions

View File

@ -684,6 +684,9 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen) {
taosHashPut(pTq->pStreamTasks, &pTask->taskId, sizeof(int32_t), &pTask, sizeof(void*)); taosHashPut(pTq->pStreamTasks, &pTask->taskId, sizeof(int32_t), &pTask, sizeof(void*));
/*SMeta* pMeta = pTq->pVnode->pMeta;*/
/*tdbTbUpsert(pMeta->pTaskIdx, &pTask->taskId, sizeof(int32_t), msg, msgLen, &pMeta->txn);*/
return 0; return 0;
FAIL: FAIL:
if (pTask->inputQueue) streamQueueClose(pTask->inputQueue); if (pTask->inputQueue) streamQueueClose(pTask->inputQueue);

View File

@ -48,7 +48,6 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
SStreamScanInfo* pInfo = pOperator->info; SStreamScanInfo* pInfo = pOperator->info;
/*pInfo->assignBlockUid = assignUid;*/
// TODO: if a block was set but not consumed, // TODO: if a block was set but not consumed,
// prevent setting a different type of block // prevent setting a different type of block

View File

@ -481,10 +481,6 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
pBlock->info.groupId = *groupId; pBlock->info.groupId = *groupId;
} }
if (pTableScanInfo->assignBlockUid) {
pBlock->info.groupId = pBlock->info.uid;
}
pOperator->resultInfo.totalRows = pTableScanInfo->readRecorder.totalRows; pOperator->resultInfo.totalRows = pTableScanInfo->readRecorder.totalRows;
pTableScanInfo->readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0; pTableScanInfo->readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
@ -1174,12 +1170,6 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
pInfo->pRes->info.groupId = 0; pInfo->pRes->info.groupId = 0;
} }
// for generating rollup SMA result, each time is an independent time serie.
// TODO temporarily used, when the statement of "partition by tbname" is ready, remove this
if (pInfo->assignBlockUid) {
pInfo->pRes->info.groupId = pBlock->info.uid;
}
// todo extract method // todo extract method
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pColMatchInfo); ++i) { for (int32_t i = 0; i < taosArrayGetSize(pInfo->pColMatchInfo); ++i) {
SColMatchInfo* pColMatchInfo = taosArrayGet(pInfo->pColMatchInfo, i); SColMatchInfo* pColMatchInfo = taosArrayGet(pInfo->pColMatchInfo, i);

View File

@ -2236,7 +2236,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{ {
.name = "interp", .name = "interp",
.type = FUNCTION_TYPE_INTERP, .type = FUNCTION_TYPE_INTERP,
.classification = FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_INTERVAL_INTERPO_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC, .classification = FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_INTERVAL_INTERPO_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC | FUNC_MGT_FORBID_STREAM_FUNC,
.translateFunc = translateFirstLast, .translateFunc = translateFirstLast,
.getEnvFunc = getSelectivityFuncEnv, .getEnvFunc = getSelectivityFuncEnv,
.initFunc = functionSetup, .initFunc = functionSetup,
@ -2246,7 +2246,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{ {
.name = "derivative", .name = "derivative",
.type = FUNCTION_TYPE_DERIVATIVE, .type = FUNCTION_TYPE_DERIVATIVE,
.classification = FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC | FUNC_MGT_CUMULATIVE_FUNC, .classification = FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC | FUNC_MGT_CUMULATIVE_FUNC | FUNC_MGT_FORBID_STREAM_FUNC,
.translateFunc = translateDerivative, .translateFunc = translateDerivative,
.getEnvFunc = getDerivativeFuncEnv, .getEnvFunc = getDerivativeFuncEnv,
.initFunc = derivativeFuncSetup, .initFunc = derivativeFuncSetup,
@ -2280,7 +2280,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{ {
.name = "_cache_last_row", .name = "_cache_last_row",
.type = FUNCTION_TYPE_CACHE_LAST_ROW, .type = FUNCTION_TYPE_CACHE_LAST_ROW,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC, .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC | FUNC_MGT_FORBID_STREAM_FUNC,
.translateFunc = translateFirstLast, .translateFunc = translateFirstLast,
.getEnvFunc = getFirstLastFuncEnv, .getEnvFunc = getFirstLastFuncEnv,
.initFunc = functionSetup, .initFunc = functionSetup,
@ -2375,7 +2375,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{ {
.name = "histogram", .name = "histogram",
.type = FUNCTION_TYPE_HISTOGRAM, .type = FUNCTION_TYPE_HISTOGRAM,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_MULTI_ROWS_FUNC | FUNC_MGT_FORBID_FILL_FUNC, .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_MULTI_ROWS_FUNC | FUNC_MGT_FORBID_FILL_FUNC | FUNC_MGT_FORBID_STREAM_FUNC,
.translateFunc = translateHistogram, .translateFunc = translateHistogram,
.getEnvFunc = getHistogramFuncEnv, .getEnvFunc = getHistogramFuncEnv,
.initFunc = histogramFunctionSetup, .initFunc = histogramFunctionSetup,
@ -2460,7 +2460,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.processFunc = diffFunction, .processFunc = diffFunction,
.sprocessFunc = diffScalarFunction, .sprocessFunc = diffScalarFunction,
.finalizeFunc = functionFinalize, .finalizeFunc = functionFinalize,
.estimateReturnRowsFunc = diffEstReturnRows .estimateReturnRowsFunc = diffEstReturnRows,
}, },
{ {
.name = "statecount", .name = "statecount",
@ -2521,8 +2521,8 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{ {
.name = "tail", .name = "tail",
.type = FUNCTION_TYPE_TAIL, .type = FUNCTION_TYPE_TAIL,
.classification = FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_FORBID_STREAM_FUNC | .classification = FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_TIMELINE_FUNC |
FUNC_MGT_IMPLICIT_TS_FUNC, FUNC_MGT_FORBID_STREAM_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC,
.translateFunc = translateTail, .translateFunc = translateTail,
.getEnvFunc = getTailFuncEnv, .getEnvFunc = getTailFuncEnv,
.initFunc = tailFunctionSetup, .initFunc = tailFunctionSetup,

View File

@ -462,7 +462,8 @@ int32_t streamDispatch(SStreamTask* pTask) {
if (streamDispatchAllBlocks(pTask, pBlock) < 0) { if (streamDispatchAllBlocks(pTask, pBlock) < 0) {
ASSERT(0); ASSERT(0);
code = -1; code = -1;
// TODO set status fail streamQueueProcessFail(pTask->outputQueue);
atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL);
goto FREE; goto FREE;
} }
/*atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL);*/ /*atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL);*/

View File

@ -441,9 +441,12 @@ int32_t walReadVer(SWalReader *pReader, int64_t ver) {
return -1; return -1;
} }
taosThreadMutexLock(&pReader->mutex);
if (pReader->curInvalid || pReader->curVersion != ver) { if (pReader->curInvalid || pReader->curVersion != ver) {
if (walReadSeekVer(pReader, ver) < 0) { if (walReadSeekVer(pReader, ver) < 0) {
wError("vgId:%d, unexpected wal log, index:%" PRId64 ", since %s", pReader->pWal->cfg.vgId, ver, terrstr()); wError("vgId:%d, unexpected wal log, index:%" PRId64 ", since %s", pReader->pWal->cfg.vgId, ver, terrstr());
taosThreadMutexUnlock(&pReader->mutex);
return -1; return -1;
} }
seeked = true; seeked = true;
@ -464,6 +467,7 @@ int32_t walReadVer(SWalReader *pReader, int64_t ver) {
terrno = TSDB_CODE_WAL_FILE_CORRUPTED; terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
} }
ASSERT(0); ASSERT(0);
taosThreadMutexUnlock(&pReader->mutex);
return -1; return -1;
} }
} }
@ -473,6 +477,7 @@ int32_t walReadVer(SWalReader *pReader, int64_t ver) {
wError("vgId:%d, unexpected wal log, index:%" PRId64 ", since head checksum not passed", pReader->pWal->cfg.vgId, wError("vgId:%d, unexpected wal log, index:%" PRId64 ", since head checksum not passed", pReader->pWal->cfg.vgId,
ver); ver);
terrno = TSDB_CODE_WAL_FILE_CORRUPTED; terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
taosThreadMutexUnlock(&pReader->mutex);
return -1; return -1;
} }
@ -480,6 +485,7 @@ int32_t walReadVer(SWalReader *pReader, int64_t ver) {
void *ptr = taosMemoryRealloc(pReader->pHead, sizeof(SWalCkHead) + pReader->pHead->head.bodyLen); void *ptr = taosMemoryRealloc(pReader->pHead, sizeof(SWalCkHead) + pReader->pHead->head.bodyLen);
if (ptr == NULL) { if (ptr == NULL) {
terrno = TSDB_CODE_WAL_OUT_OF_MEMORY; terrno = TSDB_CODE_WAL_OUT_OF_MEMORY;
taosThreadMutexUnlock(&pReader->mutex);
return -1; return -1;
} }
pReader->pHead = ptr; pReader->pHead = ptr;
@ -494,6 +500,7 @@ int32_t walReadVer(SWalReader *pReader, int64_t ver) {
terrno = TSDB_CODE_WAL_FILE_CORRUPTED; terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
ASSERT(0); ASSERT(0);
} }
taosThreadMutexUnlock(&pReader->mutex);
return -1; return -1;
} }
@ -503,6 +510,7 @@ int32_t walReadVer(SWalReader *pReader, int64_t ver) {
pReader->curInvalid = 1; pReader->curInvalid = 1;
terrno = TSDB_CODE_WAL_FILE_CORRUPTED; terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
ASSERT(0); ASSERT(0);
taosThreadMutexUnlock(&pReader->mutex);
return -1; return -1;
} }
@ -516,9 +524,12 @@ int32_t walReadVer(SWalReader *pReader, int64_t ver) {
pReader->curInvalid = 1; pReader->curInvalid = 1;
terrno = TSDB_CODE_WAL_FILE_CORRUPTED; terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
ASSERT(0); ASSERT(0);
taosThreadMutexUnlock(&pReader->mutex);
return -1; return -1;
} }
pReader->curVersion++; pReader->curVersion++;
taosThreadMutexUnlock(&pReader->mutex);
return 0; return 0;
} }

View File

@ -408,7 +408,7 @@ static FORCE_INLINE int32_t walWriteImpl(SWal *pWal, int64_t index, tmsg_t msgTy
pWal->writeHead.head.version = index; pWal->writeHead.head.version = index;
pWal->writeHead.head.bodyLen = bodyLen; pWal->writeHead.head.bodyLen = bodyLen;
pWal->writeHead.head.msgType = msgType; pWal->writeHead.head.msgType = msgType;
pWal->writeHead.head.ingestTs = taosGetTimestampMs(); pWal->writeHead.head.ingestTs = 0;
// sync info for sync module // sync info for sync module
pWal->writeHead.head.syncMeta = syncMeta; pWal->writeHead.head.syncMeta = syncMeta;