Merge branch '3.0' of https://github.com/taosdata/TDengine into refact/tsdb_new_format
This commit is contained in:
commit
30f53e2f5a
|
@ -148,6 +148,7 @@ struct SRSmaInfoItem {
|
||||||
};
|
};
|
||||||
|
|
||||||
struct SRSmaInfo {
|
struct SRSmaInfo {
|
||||||
|
SSma *pSma;
|
||||||
STSchema *pTSchema;
|
STSchema *pTSchema;
|
||||||
int64_t suid;
|
int64_t suid;
|
||||||
int64_t lastRecv; // ms
|
int64_t lastRecv; // ms
|
||||||
|
|
|
@ -188,13 +188,15 @@ static int32_t tdUpdateQTaskInfoFiles(SSma *pSma, SRSmaStat *pStat) {
|
||||||
|
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(pFS->aQTaskInf);) {
|
for (int32_t i = 0; i < taosArrayGetSize(pFS->aQTaskInf);) {
|
||||||
SQTaskFile *pTaskF = taosArrayGet(pFS->aQTaskInf, i);
|
SQTaskFile *pTaskF = taosArrayGet(pFS->aQTaskInf, i);
|
||||||
if (atomic_sub_fetch_32(&pTaskF->nRef, 1) <= 0) {
|
int32_t oldVal = atomic_fetch_sub_32(&pTaskF->nRef, 1);
|
||||||
|
if ((oldVal <= 1) && (pTaskF->version < committed)) {
|
||||||
tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), pTaskF->version, tfsGetPrimaryPath(pVnode->pTfs), qTaskInfoFullName);
|
tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), pTaskF->version, tfsGetPrimaryPath(pVnode->pTfs), qTaskInfoFullName);
|
||||||
if (taosRemoveFile(qTaskInfoFullName) < 0) {
|
if (taosRemoveFile(qTaskInfoFullName) < 0) {
|
||||||
smaWarn("vgId:%d, cleanup qinf, failed to remove %s since %s", TD_VID(pVnode), qTaskInfoFullName,
|
smaWarn("vgId:%d, cleanup qinf, committed %" PRIi64 ", failed to remove %s since %s", TD_VID(pVnode), committed,
|
||||||
tstrerror(TAOS_SYSTEM_ERROR(errno)));
|
qTaskInfoFullName, tstrerror(TAOS_SYSTEM_ERROR(errno)));
|
||||||
} else {
|
} else {
|
||||||
smaDebug("vgId:%d, cleanup qinf, success to remove %s", TD_VID(pVnode), qTaskInfoFullName);
|
smaDebug("vgId:%d, cleanup qinf, committed %" PRIi64 ", success to remove %s", TD_VID(pVnode), committed,
|
||||||
|
qTaskInfoFullName);
|
||||||
}
|
}
|
||||||
taosArrayRemove(pFS->aQTaskInf, i);
|
taosArrayRemove(pFS->aQTaskInf, i);
|
||||||
continue;
|
continue;
|
||||||
|
@ -364,7 +366,6 @@ static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma) {
|
||||||
}
|
}
|
||||||
|
|
||||||
SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
|
SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
|
||||||
SRSmaInfoItem *pItem = NULL;
|
|
||||||
|
|
||||||
// step 1: merge qTaskInfo and iQTaskInfo
|
// step 1: merge qTaskInfo and iQTaskInfo
|
||||||
// lock
|
// lock
|
||||||
|
|
|
@ -62,7 +62,7 @@ int32_t smaInit() {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t type = (8 == POINTER_BYTES) ? TSDB_DATA_TYPE_UBIGINT : TSDB_DATA_TYPE_UINT;
|
int32_t type = (8 == POINTER_BYTES) ? TSDB_DATA_TYPE_UBIGINT : TSDB_DATA_TYPE_UINT;
|
||||||
smaMgmt.refHash = taosHashInit(1, taosGetDefaultHashFunction(type), true, HASH_ENTRY_LOCK);
|
smaMgmt.refHash = taosHashInit(64, taosGetDefaultHashFunction(type), true, HASH_ENTRY_LOCK);
|
||||||
if (!smaMgmt.refHash) {
|
if (!smaMgmt.refHash) {
|
||||||
taosCloseRef(smaMgmt.rsetId);
|
taosCloseRef(smaMgmt.rsetId);
|
||||||
atomic_store_8(&smaMgmt.inited, 0);
|
atomic_store_8(&smaMgmt.inited, 0);
|
||||||
|
@ -107,6 +107,7 @@ void smaCleanUp() {
|
||||||
if (old == 1) {
|
if (old == 1) {
|
||||||
taosCloseRef(smaMgmt.rsetId);
|
taosCloseRef(smaMgmt.rsetId);
|
||||||
taosHashCleanup(smaMgmt.refHash);
|
taosHashCleanup(smaMgmt.refHash);
|
||||||
|
smaMgmt.refHash = NULL;
|
||||||
taosTmrCleanUp(smaMgmt.tmrHandle);
|
taosTmrCleanUp(smaMgmt.tmrHandle);
|
||||||
smaInfo("sma mgmt env is cleaned up, rsetId:%d, tmrHandle:%p", smaMgmt.rsetId, smaMgmt.tmrHandle);
|
smaInfo("sma mgmt env is cleaned up, rsetId:%d, tmrHandle:%p", smaMgmt.rsetId, smaMgmt.tmrHandle);
|
||||||
atomic_store_8(&smaMgmt.inited, 0);
|
atomic_store_8(&smaMgmt.inited, 0);
|
||||||
|
@ -220,7 +221,7 @@ static void tRSmaInfoHashFreeNode(void *data) {
|
||||||
if ((pItem = RSMA_INFO_ITEM((SRSmaInfo *)pRSmaInfo, 1)) && pItem->level) {
|
if ((pItem = RSMA_INFO_ITEM((SRSmaInfo *)pRSmaInfo, 1)) && pItem->level) {
|
||||||
taosHashRemove(smaMgmt.refHash, &pItem, POINTER_BYTES);
|
taosHashRemove(smaMgmt.refHash, &pItem, POINTER_BYTES);
|
||||||
}
|
}
|
||||||
tdFreeRSmaInfo(NULL, pRSmaInfo, true);
|
tdFreeRSmaInfo(pRSmaInfo->pSma, pRSmaInfo, true);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -121,27 +121,27 @@ static void tdRSmaQTaskInfoFree(qTaskInfo_t *taskHandle, int32_t vgId, int32_t l
|
||||||
*/
|
*/
|
||||||
void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree) {
|
void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree) {
|
||||||
if (pInfo) {
|
if (pInfo) {
|
||||||
int32_t vid = pSma ? SMA_VID(pSma) : -1;
|
|
||||||
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
|
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
|
||||||
SRSmaInfoItem *pItem = &pInfo->items[i];
|
SRSmaInfoItem *pItem = &pInfo->items[i];
|
||||||
|
|
||||||
if (isDeepFree && pItem->tmrId) {
|
if (isDeepFree && pItem->tmrId) {
|
||||||
smaDebug("vgId:%d, stop fetch timer %p for table %" PRIi64 " level %d", vid, pItem->tmrId, pInfo->suid, i + 1);
|
smaDebug("vgId:%d, stop fetch timer %p for table %" PRIi64 " level %d", SMA_VID(pSma), pItem->tmrId,
|
||||||
|
pInfo->suid, i + 1);
|
||||||
taosTmrStopA(&pItem->tmrId);
|
taosTmrStopA(&pItem->tmrId);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (isDeepFree && pInfo->taskInfo[i]) {
|
if (isDeepFree && pInfo->taskInfo[i]) {
|
||||||
tdRSmaQTaskInfoFree(&pInfo->taskInfo[i], vid, i + 1);
|
tdRSmaQTaskInfoFree(&pInfo->taskInfo[i], SMA_VID(pSma), i + 1);
|
||||||
} else {
|
} else {
|
||||||
smaDebug("vgId:%d, table %" PRIi64 " no need to destroy rsma info level %d since empty taskInfo", vid,
|
smaDebug("vgId:%d, table %" PRIi64 " no need to destroy rsma info level %d since empty taskInfo", SMA_VID(pSma),
|
||||||
pInfo->suid, i + 1);
|
pInfo->suid, i + 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pInfo->iTaskInfo[i]) {
|
if (pInfo->iTaskInfo[i]) {
|
||||||
tdRSmaQTaskInfoFree(&pInfo->iTaskInfo[i], vid, i + 1);
|
tdRSmaQTaskInfoFree(&pInfo->iTaskInfo[i], SMA_VID(pSma), i + 1);
|
||||||
} else {
|
} else {
|
||||||
smaDebug("vgId:%d, table %" PRIi64 " no need to destroy rsma info level %d since empty iTaskInfo", vid,
|
smaDebug("vgId:%d, table %" PRIi64 " no need to destroy rsma info level %d since empty iTaskInfo",
|
||||||
pInfo->suid, i + 1);
|
SMA_VID(pSma), pInfo->suid, i + 1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (isDeepFree) {
|
if (isDeepFree) {
|
||||||
|
@ -377,7 +377,10 @@ int32_t tdRSmaProcessCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con
|
||||||
terrno = TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION;
|
terrno = TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION;
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
pRSmaInfo->pSma = pSma;
|
||||||
pRSmaInfo->pTSchema = pTSchema;
|
pRSmaInfo->pTSchema = pTSchema;
|
||||||
|
pRSmaInfo->suid = suid;
|
||||||
|
T_REF_INIT_VAL(pRSmaInfo, 1);
|
||||||
if (!(pRSmaInfo->queue = taosOpenQueue())) {
|
if (!(pRSmaInfo->queue = taosOpenQueue())) {
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
@ -391,8 +394,6 @@ int32_t tdRSmaProcessCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con
|
||||||
if (!(pRSmaInfo->iQall = taosAllocateQall())) {
|
if (!(pRSmaInfo->iQall = taosAllocateQall())) {
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
pRSmaInfo->suid = suid;
|
|
||||||
T_REF_INIT_VAL(pRSmaInfo, 1);
|
|
||||||
|
|
||||||
if (tdSetRSmaInfoItemParams(pSma, param, pStat, pRSmaInfo, 0) < 0) {
|
if (tdSetRSmaInfoItemParams(pSma, param, pStat, pRSmaInfo, 0) < 0) {
|
||||||
goto _err;
|
goto _err;
|
||||||
|
@ -1586,7 +1587,7 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) {
|
||||||
|
|
||||||
if (!(pStat = (SRSmaStat *)tdAcquireSmaRef(smaMgmt.rsetId, pRSmaRef->refId))) {
|
if (!(pStat = (SRSmaStat *)tdAcquireSmaRef(smaMgmt.rsetId, pRSmaRef->refId))) {
|
||||||
smaDebug("rsma fetch task not start since rsma stat already destroyed, rsetId:%" PRIi64 " refId:%d)",
|
smaDebug("rsma fetch task not start since rsma stat already destroyed, rsetId:%" PRIi64 " refId:%d)",
|
||||||
smaMgmt.rsetId, pRSmaRef->refId); // pRSmaRef freed in taosHashRemove
|
smaMgmt.rsetId, pRSmaRef->refId); // pRSmaRef freed in taosHashRemove
|
||||||
taosHashRemove(smaMgmt.refHash, ¶m, POINTER_BYTES);
|
taosHashRemove(smaMgmt.refHash, ¶m, POINTER_BYTES);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -1636,7 +1637,7 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) {
|
||||||
switch (fetchTriggerStat) {
|
switch (fetchTriggerStat) {
|
||||||
case TASK_TRIGGER_STAT_ACTIVE: {
|
case TASK_TRIGGER_STAT_ACTIVE: {
|
||||||
smaDebug("vgId:%d, rsma fetch task planned for level:%" PRIi8 " suid:%" PRIi64 " since stat is active",
|
smaDebug("vgId:%d, rsma fetch task planned for level:%" PRIi8 " suid:%" PRIi64 " since stat is active",
|
||||||
SMA_VID(pSma), pItem->level, pRSmaInfo->suid);
|
SMA_VID(pSma), pItem->level, pRSmaInfo->suid);
|
||||||
// async process
|
// async process
|
||||||
pItem->fetchLevel = pItem->level;
|
pItem->fetchLevel = pItem->level;
|
||||||
#if 0
|
#if 0
|
||||||
|
|
|
@ -97,6 +97,8 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static FORCE_INLINE void streamInputBlockDataDestory(void* pBlock) { blockDataDestroy((SSDataBlock*)pBlock); }
|
||||||
|
|
||||||
void tdCleanupStreamInputDataBlock(qTaskInfo_t tinfo) {
|
void tdCleanupStreamInputDataBlock(qTaskInfo_t tinfo) {
|
||||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
||||||
if (!pTaskInfo || !pTaskInfo->pRoot || pTaskInfo->pRoot->numOfDownstream <= 0) {
|
if (!pTaskInfo || !pTaskInfo->pRoot || pTaskInfo->pRoot->numOfDownstream <= 0) {
|
||||||
|
@ -107,11 +109,7 @@ void tdCleanupStreamInputDataBlock(qTaskInfo_t tinfo) {
|
||||||
if (pOptrInfo->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
|
if (pOptrInfo->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
|
||||||
SStreamScanInfo* pInfo = pOptrInfo->info;
|
SStreamScanInfo* pInfo = pOptrInfo->info;
|
||||||
if (pInfo->blockType == STREAM_INPUT__DATA_BLOCK) {
|
if (pInfo->blockType == STREAM_INPUT__DATA_BLOCK) {
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pBlockLists); ++i) {
|
taosArrayClearP(pInfo->pBlockLists, streamInputBlockDataDestory);
|
||||||
SSDataBlock* p = *(SSDataBlock**)taosArrayGet(pInfo->pBlockLists, i);
|
|
||||||
taosArrayDestroy(p->pDataBlock);
|
|
||||||
taosMemoryFreeClear(p);
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue