diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 282b527b31..51f30aca21 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -884,7 +884,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .getEnvFunc = getTailFuncEnv, .initFunc = tailFunctionSetup, .processFunc = tailFunction, - .finalizeFunc = NULL + .finalizeFunc = tailFinalize }, { .name = "abs", diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 02382cc228..7081a65dd0 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -164,10 +164,10 @@ typedef struct SSampleInfo { int64_t *timestamp; } SSampleInfo; -typedef struct STailUnit { +typedef struct STailItem { int64_t timestamp; char data[]; -} STailUnit; +} STailItem; typedef struct STailInfo { int32_t numOfPoints; @@ -175,7 +175,7 @@ typedef struct STailInfo { int32_t offset; uint8_t colType; int16_t colBytes; - STailUnit **pRes; + STailItem **pItems; } STailInfo; #define SET_VAL(_info, numOfElem, res) \ @@ -3164,7 +3164,7 @@ bool getTailFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { SColumnNode* pCol = (SColumnNode*)nodesListGetNode(pFunc->pParameterList, 0); SValueNode* pVal = (SValueNode*)nodesListGetNode(pFunc->pParameterList, 1); int32_t numOfPoints = pVal->datum.i; - pEnv->calcMemSize = sizeof(STailInfo) + numOfPoints * (POINTER_BYTES + sizeof(STailUnit) + pCol->node.resType.bytes); + pEnv->calcMemSize = sizeof(STailInfo) + numOfPoints * (POINTER_BYTES + sizeof(STailItem) + pCol->node.resType.bytes); return true; } @@ -3184,36 +3184,37 @@ bool tailFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo *pResultInfo) { return false; } - pInfo->pRes = (STailUnit **)((char *)pInfo + sizeof(STailInfo)); - char *pUnit = (char *)pInfo->pRes + pInfo->numOfPoints * POINTER_BYTES; + pInfo->pItems = (STailItem **)((char *)pInfo + sizeof(STailInfo)); + char *pItem = (char *)pInfo->pItems + pInfo->numOfPoints * POINTER_BYTES; - size_t unitSize = sizeof(STailUnit) + pInfo->colBytes; + size_t unitSize = sizeof(STailItem) + pInfo->colBytes; for (int32_t i = 0; i < pInfo->numOfPoints; ++i) { - pInfo->pRes[i] = (STailUnit *)(pUnit + i * unitSize); + pInfo->pItems[i] = (STailItem *)(pItem + i * unitSize); } return true; } -static void tailAssignResult(STailUnit* pUnit, char *data, int32_t colBytes, TSKEY ts) { - pUnit->timestamp = ts; - memcpy(pUnit->data, data, colBytes); +static void tailAssignResult(STailItem* pItem, char *data, int32_t colBytes, TSKEY ts) { + pItem->timestamp = ts; + memcpy(pItem->data, data, colBytes); } static int32_t tailCompFn(const void *p1, const void *p2, const void *param) { - STailUnit *d1 = *(STailUnit **)p1; - STailUnit *d2 = *(STailUnit **)p2; + STailItem *d1 = *(STailItem **)p1; + STailItem *d2 = *(STailItem **)p2; return compareInt64Val(&d1->timestamp, &d2->timestamp); } static void doTailAdd(STailInfo* pInfo, char *data, TSKEY ts) { - STailUnit **pList = pInfo->pRes; + STailItem **pList = pInfo->pItems; if (pInfo->numAdded < pInfo->numOfPoints) { tailAssignResult(pList[pInfo->numAdded], data, pInfo->colBytes, ts); - taosheapsort((void *)pList, sizeof(STailUnit **), pInfo->numAdded + 1, NULL, tailCompFn, 0); + taosheapsort((void *)pList, sizeof(STailItem **), pInfo->numAdded + 1, NULL, tailCompFn, 0); + pInfo->numAdded++; } else if (pList[0]->timestamp < ts) { tailAssignResult(pList[0], data, pInfo->colBytes, ts); - taosheapadjust((void *)pList, sizeof(STailUnit **), 0, pInfo->numOfPoints - 1, NULL, tailCompFn, NULL, 0); + taosheapadjust((void *)pList, sizeof(STailItem **), 0, pInfo->numOfPoints - 1, NULL, tailCompFn, NULL, 0); } } @@ -3231,7 +3232,7 @@ int32_t tailFunction(SqlFunctionCtx* pCtx) { int32_t startOffset = pCtx->offset; for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; i += 1) { if (colDataIsNull_f(pInputCol->nullbitmap, i)) { - //colDataAppendNULL(pOutput, i); + colDataAppendNULL(pOutput, i); continue; } @@ -3239,5 +3240,34 @@ int32_t tailFunction(SqlFunctionCtx* pCtx) { doTailAdd(pInfo, data, tsList[i]); } + for (int32_t i = 0; i < pInfo->numOfPoints; ++i) { + int32_t pos = startOffset + i; + STailItem *pItem = pInfo->pItems[i]; + colDataAppend(pOutput, pos, pItem->data, false); + } + return pInfo->numOfPoints; } + +int32_t tailFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { + SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx); + STailInfo* pInfo = GET_ROWCELL_INTERBUF(pEntryInfo); + pEntryInfo->complete = true; + + int32_t type = pCtx->input.pData[0]->info.type; + int32_t slotId = pCtx->pExpr->base.resSchema.slotId; + + SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId); + + // todo assign the tag value and the corresponding row data + int32_t currentRow = pBlock->info.rows; + for (int32_t i = 0; i < pEntryInfo->numOfRes; ++i) { + STailItem *pItem = pInfo->pItems[i]; + colDataAppend(pCol, currentRow, pItem->data, false); + + //setSelectivityValue(pCtx, pBlock, &pInfo->pItems[i].tuplePos, currentRow); + currentRow += 1; + } + + return pEntryInfo->numOfRes; +}