diff --git a/examples/c/tmq.c b/examples/c/tmq.c index fd1f146618..3686251b4b 100644 --- a/examples/c/tmq.c +++ b/examples/c/tmq.c @@ -29,7 +29,7 @@ static void msg_process(TAOS_RES* msg) { printf("vg: %d\n", tmq_get_vgroup_id(msg)); if (tmq_get_res_type(msg) == TMQ_RES_TABLE_META) { tmq_raw_data raw = {0}; - int32_t code = tmq_get_raw(msg, &raw); + int32_t code = tmq_get_raw(msg, &raw); if (code == 0) { TAOS* pConn = taos_connect("192.168.1.86", "root", "taosdata", NULL, 0); if (pConn == NULL) { @@ -302,7 +302,7 @@ int32_t create_topic() { } taos_free_result(pRes); -// pRes = taos_query(pConn, "create topic topic_ctb_column with meta as database abc1"); + // pRes = taos_query(pConn, "create topic topic_ctb_column with meta as database abc1"); pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from st1"); if (taos_errno(pRes) != 0) { printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes)); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 364ecbab61..86b016b8be 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -684,6 +684,9 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen) { taosHashPut(pTq->pStreamTasks, &pTask->taskId, sizeof(int32_t), &pTask, sizeof(void*)); + /*SMeta* pMeta = pTq->pVnode->pMeta;*/ + /*tdbTbUpsert(pMeta->pTaskIdx, &pTask->taskId, sizeof(int32_t), msg, msgLen, &pMeta->txn);*/ + return 0; FAIL: if (pTask->inputQueue) streamQueueClose(pTask->inputQueue); diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 8f3ecafbea..775017b8dd 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -48,7 +48,6 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu pOperator->status = OP_NOT_OPENED; SStreamScanInfo* pInfo = pOperator->info; - /*pInfo->assignBlockUid = assignUid;*/ // TODO: if a block was set but not consumed, // prevent setting a different type of block diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index fc67f3da6c..e38034f4aa 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -481,10 +481,6 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) { pBlock->info.groupId = *groupId; } - if (pTableScanInfo->assignBlockUid) { - pBlock->info.groupId = pBlock->info.uid; - } - pOperator->resultInfo.totalRows = pTableScanInfo->readRecorder.totalRows; pTableScanInfo->readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0; @@ -1174,12 +1170,6 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock pInfo->pRes->info.groupId = 0; } - // for generating rollup SMA result, each time is an independent time serie. - // TODO temporarily used, when the statement of "partition by tbname" is ready, remove this - if (pInfo->assignBlockUid) { - pInfo->pRes->info.groupId = pBlock->info.uid; - } - // todo extract method for (int32_t i = 0; i < taosArrayGetSize(pInfo->pColMatchInfo); ++i) { SColMatchInfo* pColMatchInfo = taosArrayGet(pInfo->pColMatchInfo, i); diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 1db8264354..e58e7475df 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -2236,7 +2236,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { { .name = "interp", .type = FUNCTION_TYPE_INTERP, - .classification = FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_INTERVAL_INTERPO_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC, + .classification = FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_INTERVAL_INTERPO_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC | FUNC_MGT_FORBID_STREAM_FUNC, .translateFunc = translateFirstLast, .getEnvFunc = getSelectivityFuncEnv, .initFunc = functionSetup, @@ -2246,7 +2246,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { { .name = "derivative", .type = FUNCTION_TYPE_DERIVATIVE, - .classification = FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC | FUNC_MGT_CUMULATIVE_FUNC, + .classification = FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC | FUNC_MGT_CUMULATIVE_FUNC | FUNC_MGT_FORBID_STREAM_FUNC, .translateFunc = translateDerivative, .getEnvFunc = getDerivativeFuncEnv, .initFunc = derivativeFuncSetup, @@ -2280,7 +2280,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { { .name = "_cache_last_row", .type = FUNCTION_TYPE_CACHE_LAST_ROW, - .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC | FUNC_MGT_FORBID_STREAM_FUNC, .translateFunc = translateFirstLast, .getEnvFunc = getFirstLastFuncEnv, .initFunc = functionSetup, @@ -2375,7 +2375,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { { .name = "histogram", .type = FUNCTION_TYPE_HISTOGRAM, - .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_MULTI_ROWS_FUNC | FUNC_MGT_FORBID_FILL_FUNC, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_MULTI_ROWS_FUNC | FUNC_MGT_FORBID_FILL_FUNC | FUNC_MGT_FORBID_STREAM_FUNC, .translateFunc = translateHistogram, .getEnvFunc = getHistogramFuncEnv, .initFunc = histogramFunctionSetup, @@ -2460,7 +2460,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .processFunc = diffFunction, .sprocessFunc = diffScalarFunction, .finalizeFunc = functionFinalize, - .estimateReturnRowsFunc = diffEstReturnRows + .estimateReturnRowsFunc = diffEstReturnRows, }, { .name = "statecount", @@ -2521,8 +2521,8 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { { .name = "tail", .type = FUNCTION_TYPE_TAIL, - .classification = FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_FORBID_STREAM_FUNC | - FUNC_MGT_IMPLICIT_TS_FUNC, + .classification = FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_TIMELINE_FUNC | + FUNC_MGT_FORBID_STREAM_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC, .translateFunc = translateTail, .getEnvFunc = getTailFuncEnv, .initFunc = tailFunctionSetup, diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index ec1dd693e1..95e5b61dbd 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -462,7 +462,8 @@ int32_t streamDispatch(SStreamTask* pTask) { if (streamDispatchAllBlocks(pTask, pBlock) < 0) { ASSERT(0); code = -1; - // TODO set status fail + streamQueueProcessFail(pTask->outputQueue); + atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL); goto FREE; } /*atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL);*/ diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index 787c9af317..9be648b518 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -441,9 +441,12 @@ int32_t walReadVer(SWalReader *pReader, int64_t ver) { return -1; } + taosThreadMutexLock(&pReader->mutex); + if (pReader->curInvalid || pReader->curVersion != ver) { if (walReadSeekVer(pReader, ver) < 0) { wError("vgId:%d, unexpected wal log, index:%" PRId64 ", since %s", pReader->pWal->cfg.vgId, ver, terrstr()); + taosThreadMutexUnlock(&pReader->mutex); return -1; } seeked = true; @@ -464,6 +467,7 @@ int32_t walReadVer(SWalReader *pReader, int64_t ver) { terrno = TSDB_CODE_WAL_FILE_CORRUPTED; } ASSERT(0); + taosThreadMutexUnlock(&pReader->mutex); return -1; } } @@ -473,6 +477,7 @@ int32_t walReadVer(SWalReader *pReader, int64_t ver) { wError("vgId:%d, unexpected wal log, index:%" PRId64 ", since head checksum not passed", pReader->pWal->cfg.vgId, ver); terrno = TSDB_CODE_WAL_FILE_CORRUPTED; + taosThreadMutexUnlock(&pReader->mutex); return -1; } @@ -480,6 +485,7 @@ int32_t walReadVer(SWalReader *pReader, int64_t ver) { void *ptr = taosMemoryRealloc(pReader->pHead, sizeof(SWalCkHead) + pReader->pHead->head.bodyLen); if (ptr == NULL) { terrno = TSDB_CODE_WAL_OUT_OF_MEMORY; + taosThreadMutexUnlock(&pReader->mutex); return -1; } pReader->pHead = ptr; @@ -494,6 +500,7 @@ int32_t walReadVer(SWalReader *pReader, int64_t ver) { terrno = TSDB_CODE_WAL_FILE_CORRUPTED; ASSERT(0); } + taosThreadMutexUnlock(&pReader->mutex); return -1; } @@ -503,6 +510,7 @@ int32_t walReadVer(SWalReader *pReader, int64_t ver) { pReader->curInvalid = 1; terrno = TSDB_CODE_WAL_FILE_CORRUPTED; ASSERT(0); + taosThreadMutexUnlock(&pReader->mutex); return -1; } @@ -516,9 +524,12 @@ int32_t walReadVer(SWalReader *pReader, int64_t ver) { pReader->curInvalid = 1; terrno = TSDB_CODE_WAL_FILE_CORRUPTED; ASSERT(0); + taosThreadMutexUnlock(&pReader->mutex); return -1; } pReader->curVersion++; + taosThreadMutexUnlock(&pReader->mutex); + return 0; } diff --git a/source/libs/wal/src/walWrite.c b/source/libs/wal/src/walWrite.c index 4eadc92f70..68d8c54e28 100644 --- a/source/libs/wal/src/walWrite.c +++ b/source/libs/wal/src/walWrite.c @@ -408,7 +408,7 @@ static FORCE_INLINE int32_t walWriteImpl(SWal *pWal, int64_t index, tmsg_t msgTy pWal->writeHead.head.version = index; pWal->writeHead.head.bodyLen = bodyLen; pWal->writeHead.head.msgType = msgType; - pWal->writeHead.head.ingestTs = taosGetTimestampMs(); + pWal->writeHead.head.ingestTs = 0; // sync info for sync module pWal->writeHead.head.syncMeta = syncMeta;