diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index b7c23c8527..9ed2b25fdf 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -268,6 +268,7 @@ struct SVnode { tsem_t canCommit; int64_t sync; int32_t blockCount; + bool restored; tsem_t syncSem; SQHandle* pQuery; }; diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index f318c69c6f..872357fc93 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -1395,10 +1395,26 @@ void tsdbCalcColDataSMA(SColData *pColData, SColumnDataAgg *pColAgg) { break; case TSDB_DATA_TYPE_BOOL: break; - case TSDB_DATA_TYPE_TINYINT: + case TSDB_DATA_TYPE_TINYINT:{ + pColAgg->sum += colVal.value.i8; + if (pColAgg->min > colVal.value.i8) { + pColAgg->min = colVal.value.i8; + } + if (pColAgg->max < colVal.value.i8) { + pColAgg->max = colVal.value.i8; + } break; - case TSDB_DATA_TYPE_SMALLINT: + } + case TSDB_DATA_TYPE_SMALLINT:{ + pColAgg->sum += colVal.value.i16; + if (pColAgg->min > colVal.value.i16) { + pColAgg->min = colVal.value.i16; + } + if (pColAgg->max < colVal.value.i16) { + pColAgg->max = colVal.value.i16; + } break; + } case TSDB_DATA_TYPE_INT: { pColAgg->sum += colVal.value.i32; if (pColAgg->min > colVal.value.i32) { @@ -1419,24 +1435,79 @@ void tsdbCalcColDataSMA(SColData *pColData, SColumnDataAgg *pColAgg) { } break; } - case TSDB_DATA_TYPE_FLOAT: + case TSDB_DATA_TYPE_FLOAT:{ + pColAgg->sum += colVal.value.f; + if (pColAgg->min > colVal.value.f) { + pColAgg->min = colVal.value.f; + } + if (pColAgg->max < colVal.value.f) { + pColAgg->max = colVal.value.f; + } break; - case TSDB_DATA_TYPE_DOUBLE: + } + case TSDB_DATA_TYPE_DOUBLE:{ + pColAgg->sum += colVal.value.d; + if (pColAgg->min > colVal.value.d) { + pColAgg->min = colVal.value.d; + } + if (pColAgg->max < colVal.value.d) { + pColAgg->max = colVal.value.d; + } break; + } case TSDB_DATA_TYPE_VARCHAR: break; - case TSDB_DATA_TYPE_TIMESTAMP: + case TSDB_DATA_TYPE_TIMESTAMP:{ + if (pColAgg->min > colVal.value.i64) { + pColAgg->min = colVal.value.i64; + } + if (pColAgg->max < colVal.value.i64) { + pColAgg->max = colVal.value.i64; + } break; + } case TSDB_DATA_TYPE_NCHAR: break; - case TSDB_DATA_TYPE_UTINYINT: + case TSDB_DATA_TYPE_UTINYINT:{ + pColAgg->sum += colVal.value.u8; + if (pColAgg->min > colVal.value.u8) { + pColAgg->min = colVal.value.u8; + } + if (pColAgg->max < colVal.value.u8) { + pColAgg->max = colVal.value.u8; + } break; - case TSDB_DATA_TYPE_USMALLINT: + } + case TSDB_DATA_TYPE_USMALLINT:{ + pColAgg->sum += colVal.value.u16; + if (pColAgg->min > colVal.value.u16) { + pColAgg->min = colVal.value.u16; + } + if (pColAgg->max < colVal.value.u16) { + pColAgg->max = colVal.value.u16; + } break; - case TSDB_DATA_TYPE_UINT: + } + case TSDB_DATA_TYPE_UINT:{ + pColAgg->sum += colVal.value.u32; + if (pColAgg->min > colVal.value.u32) { + pColAgg->min = colVal.value.u32; + } + if (pColAgg->max < colVal.value.u32) { + pColAgg->max = colVal.value.u32; + } break; - case TSDB_DATA_TYPE_UBIGINT: + } + case TSDB_DATA_TYPE_UBIGINT:{ + pColAgg->sum += colVal.value.u64; + if (pColAgg->min > colVal.value.u64) { + pColAgg->min = colVal.value.u64; + } + if (pColAgg->max < colVal.value.u64) { + pColAgg->max = colVal.value.u64; + } break; + } case TSDB_DATA_TYPE_JSON: break; case TSDB_DATA_TYPE_VARBINARY: diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 6bc057e5ac..a0e2354f51 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -224,9 +224,19 @@ void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) vGTrace("vgId:%d, msg:%p get from vnode-write queue, weak:%d block:%d msg:%d:%d pos:%d, handle:%p", vgId, pMsg, isWeak, isBlock, msg, numOfMsgs, arrayPos, pMsg->info.handle); + if (!pVnode->restored) { + vGError("vgId:%d, msg:%p failed to process since not leader", vgId, pMsg); + terrno = TSDB_CODE_APP_NOT_READY; + vnodeHandleProposeError(pVnode, pMsg, TSDB_CODE_APP_NOT_READY); + rpcFreeCont(pMsg->pCont); + taosFreeQitem(pMsg); + continue; + } + if (pMsgArr == NULL || pIsWeakArr == NULL) { vGError("vgId:%d, msg:%p failed to process since out of memory", vgId, pMsg); - vnodeHandleProposeError(pVnode, pMsg, TSDB_CODE_OUT_OF_MEMORY); + terrno = TSDB_CODE_OUT_OF_MEMORY; + vnodeHandleProposeError(pVnode, pMsg, terrno); rpcFreeCont(pMsg->pCont); taosFreeQitem(pMsg); continue; @@ -609,6 +619,12 @@ static void vnodeLeaderTransfer(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsm SVnode *pVnode = pFsm->data; } +static void vnodeRestoreFinish(struct SSyncFSM *pFsm) { + SVnode *pVnode = pFsm->data; + pVnode->restored = true; + vDebug("vgId:%d, sync restore finished", pVnode->config.vgId); +} + static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) { SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM)); pFsm->data = pVnode; @@ -616,7 +632,7 @@ static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) { pFsm->FpPreCommitCb = vnodeSyncPreCommitMsg; pFsm->FpRollBackCb = vnodeSyncRollBackMsg; pFsm->FpGetSnapshotInfo = vnodeSyncGetSnapshot; - pFsm->FpRestoreFinishCb = NULL; + pFsm->FpRestoreFinishCb = vnodeRestoreFinish; pFsm->FpLeaderTransferCb = vnodeLeaderTransfer; pFsm->FpReConfigCb = vnodeSyncReconfig; pFsm->FpSnapshotStartRead = vnodeSnapshotStartRead; @@ -670,11 +686,10 @@ bool vnodeIsLeader(SVnode *pVnode) { return false; } - // todo - // if (!pVnode->restored) { - // terrno = TSDB_CODE_APP_NOT_READY; - // return false; - // } + if (!pVnode->restored) { + terrno = TSDB_CODE_APP_NOT_READY; + return false; + } return true; } \ No newline at end of file diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index d8cd76d31e..f249321a76 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -416,6 +416,7 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds) { } if (isTaskKilled(pTaskInfo)) { + atomic_store_64(&pTaskInfo->owner, 0); qDebug("%s already killed, abort", GET_TASKID(pTaskInfo)); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/executor/src/projectoperator.c b/source/libs/executor/src/projectoperator.c index 34149d7499..3f3df6a8f0 100644 --- a/source/libs/executor/src/projectoperator.c +++ b/source/libs/executor/src/projectoperator.c @@ -69,7 +69,7 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys // todo remove it soon if (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM) { - pInfo->mergeDataBlocks = true; + pInfo->mergeDataBlocks = false; } int32_t numOfRows = 4096; diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 324a17320e..43a5d19ada 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -1532,7 +1532,7 @@ static int32_t translateDiff(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { } uint8_t resType; - if (IS_SIGNED_NUMERIC_TYPE(colType) || TSDB_DATA_TYPE_BOOL == colType) { + if (IS_SIGNED_NUMERIC_TYPE(colType) || TSDB_DATA_TYPE_BOOL == colType || TSDB_DATA_TYPE_TIMESTAMP == colType) { resType = TSDB_DATA_TYPE_BIGINT; } else { resType = TSDB_DATA_TYPE_DOUBLE;