SC Yazılım olarak, 10 yılı aşkın süredir işletmelere dijital pazarlama, SEO ve marka yönetimi alanlarında sonuç odaklı çözümler sunuyoruz.

LATEST NEWS
iletişim
{% extends “base.html” %} {% block title %}Telekom Data Raporu{% endblock %} {% block page_title %}Telekom Data Raporu{% endblock %} {# İstersen base.html’de nav menüne bir link ekleyip burayı active yapabilirsin #} {# {% block nav_telekom_active %}active{% endblock %} #} {% block head_extra %} {% endblock %} {% block content %}
Telekom Kullanım Raporu
Cihaz / Uygulama / Tarih filtreleri + KPI özetleri
Toplam Kullanım
Volume (KB) toplamı
Download
Download (KB) toplamı
Upload
Upload (KB) toplamı
Kayıt Sayısı
Filtre sonrası satır adedi
Detay Liste
Cihaz, zaman, uygulama, upload/download/volume
Hazır
Device Name Device ID Date & Time Type Roaming Carrier Application ID Upload (KB) Download (KB) Volume (KB)
Henüz veri yok. Filtreleri seçip “Raporu Getir”e bas.
Kaynak: app/data/telekom1.csv
0 satır
{% endblock %} # ========================= # app/main.py (UPDATED) – PART 1/2 # Paste this from the top of your file down to the end of filtered_outer_sql() # ========================= from __future__ import annotations import os import time import csv from pathlib import Path from datetime import date, datetime, timedelta from typing import Dict, List, Optional from dotenv import load_dotenv from fastapi import FastAPI, Query, Request, Form from fastapi.responses import ( HTMLResponse, JSONResponse, StreamingResponse, RedirectResponse, ) from fastapi.templating import Jinja2Templates from fastapi.staticfiles import StaticFiles from openpyxl import load_workbook, Workbook from openpyxl.utils import get_column_letter # DB helpers from app.db import fetch_all, fetch_one # Router from app.musteriler import router as musteriler_router load_dotenv() app = FastAPI(title=”Panel Rapor”, version=”1.0.0″) # ============================================================================= # PATHS # ============================================================================= APP_DIR = Path(__file__).resolve().parent BASE_DIR = APP_DIR.parent TEMPLATES_DIR = BASE_DIR / “templates” STATIC_DIR = BASE_DIR / “static” DATA_DIR = APP_DIR / “data” DEFAULT_EXCEL_PATH = DATA_DIR / “users.xlsx” # expected correct file name templates = Jinja2Templates(directory=str(TEMPLATES_DIR)) if STATIC_DIR.is_dir(): app.mount(“/static”, StaticFiles(directory=str(STATIC_DIR)), name=”static”) app.include_router(musteriler_router, prefix=”/api/musteriler”, tags=[“Müşteriler”]) # ============================================================================= # AUTO LOGIN CONFIG # ============================================================================= TARGETS: Dict[str, str] = { “target1”: “https://esiparisv2.alliance-healthcare.com.tr”, “target2”: “https://esiparisv2.alliance-healthcare.com.tr/login”, } SEL_PHARMACY = “#pharmacyCode” SEL_USERNAME = “#Customer_username” SEL_PASSWORD = “#Customer_password” SEL_LOGIN_BTN = “button.Customer_login_button” def get_excel_path() -> Path: “”” Excel path priority: 1) ENV: EXCEL_LOGIN_PATH 2) Default: app/data/users.xlsx Also: if user mistakenly created users.xslx, try to auto-detect it. “”” env_path = os.getenv(“EXCEL_LOGIN_PATH”) if env_path: p = Path(env_path).expanduser().resolve() return p if DEFAULT_EXCEL_PATH.exists(): return DEFAULT_EXCEL_PATH typo = DATA_DIR / “users.xslx” if typo.exists(): return typo return DEFAULT_EXCEL_PATH def _norm_header(x: str) -> str: return str(x or “”).strip().lower() def load_users_from_excel(excel_path: Path) -> List[Dict[str, str]]: “”” Reads users from Excel using openpyxl (no pandas needed). Required columns (exact): – customer_code – username – password Optional: – musteri (display name only) Returns: [{musteri, customer_code, username, password}, …] “”” if not excel_path.exists(): return [] try: wb = load_workbook(filename=str(excel_path), data_only=True) except Exception: return [] ws = wb.active rows = list(ws.iter_rows(values_only=True)) if not rows: return [] headers = [_norm_header(c) for c in rows[0]] if not headers: return [] idx = {h: i for i, h in enumerate(headers) if h} required = [“customer_code”, “username”, “password”] if not all(k in idx for k in required): return [] out: List[Dict[str, str]] = [] for r in rows[1:]: if not r: continue def getv(key: str) -> str: i = idx.get(key) if i is None or i >= len(r): return “” v = r[i] if v is None: return “” return str(v).strip() customer_code = getv(“customer_code”) username = getv(“username”) password = getv(“password”) musteri = getv(“musteri”) if “musteri” in idx else “” if not customer_code or not username: continue out.append( { “musteri”: musteri, “customer_code”: customer_code, “username”: username, “password”: password, } ) return out def base_context(request: Request) -> Dict: return { “request”: request, “targets”: TARGETS, “db_connected”: True, “excel_path”: str(get_excel_path()), } # ============================================================================= # PLAYWRIGHT AUTO LOGIN # ============================================================================= def run_auto_login(url: str, customer_code: str, username: str, password: str) -> None: headless = os.getenv(“HEADLESS”, “0”).strip() == “1” try: from playwright.sync_api import sync_playwright except Exception as e: raise RuntimeError( ‘Playwright is not installed. Run: “pip install playwright” and then: “playwright install”‘ ) from e with sync_playwright() as p: browser = p.chromium.launch(headless=headless) context = browser.new_context() page = context.new_page() page.goto(url, wait_until=”domcontentloaded”, timeout=60000) page.wait_for_selector(SEL_PHARMACY, timeout=30000) page.wait_for_selector(SEL_USERNAME, timeout=30000) page.wait_for_selector(SEL_PASSWORD, timeout=30000) page.fill(SEL_PHARMACY, customer_code) page.fill(SEL_USERNAME, username) page.fill(SEL_PASSWORD, password) btn = page.locator(SEL_LOGIN_BTN).filter(has_text=”Giriş Yap”) if btn.count() == 0: btn = page.locator(SEL_LOGIN_BTN) btn.first.wait_for(state=”visible”, timeout=30000) btn.first.click() page.wait_for_timeout(3000) keep_open_sec = int(os.getenv(“KEEP_BROWSER_OPEN_SEC”, “2”)) if not headless and keep_open_sec > 0: page.wait_for_timeout(keep_open_sec * 1000) context.close() browser.close() # ============================================================================= # DATE HELPERS # ============================================================================= def _date_start_dt(d: date) -> datetime: return datetime(d.year, d.month, d.day, 0, 0, 0) def _date_end_dt_exclusive(d: date) -> datetime: return datetime(d.year, d.month, d.day, 0, 0, 0) + timedelta(days=1) def _first_day_of_month(dt: datetime) -> datetime: return datetime(dt.year, dt.month, 1, 0, 0, 0) def _add_months(dt: datetime, months: int) -> datetime: y = dt.year + (dt.month – 1 + months) // 12 m = (dt.month – 1 + months) % 12 + 1 return datetime(y, m, 1, 0, 0, 0) # ============================================================================= # TELEKOM (CSV) CONFIG + HELPERS ✅ FIXED (reads app/data/telekom1.csv) # ============================================================================= TELEKOM_CSV_PATH = DATA_DIR / “telekom1.csv” def _parse_float(x: str) -> float: try: s = (x or “”).strip().replace(“,”, “”) return float(s) if s else 0.0 except Exception: return 0.0 def _parse_dt_telekom(s: str) -> datetime | None: if not s: return None t = ” “.join(str(s).strip().split()) # normalize double spaces fmts = [“%m/%d/%y %I:%M:%S %p”, “%m/%d/%Y %I:%M:%S %p”] for f in fmts: try: return datetime.strptime(t, f) except Exception: pass return None def read_telekom_csv_rows() -> list[dict]: if not TELEKOM_CSV_PATH.exists(): return [] rows: list[dict] = [] with open(TELEKOM_CSV_PATH, “r”, encoding=”utf-8-sig”, newline=””) as f: reader = csv.DictReader(f) for r in reader: dt = _parse_dt_telekom(r.get(“Date & Time”, “”)) if not dt: continue rows.append( { “dt”: dt, “device_name”: (r.get(“Device Name”) or “”).strip(), “device_id”: (r.get(“Device ID”) or “”).strip(), # important: strip right-padding “application_id”: (r.get(“Application ID”) or “”).strip(), “carrier”: (r.get(“Carrier”) or “”).strip(), “type”: (r.get(“Type”) or “”).strip(), “roaming”: (r.get(“Roaming”) or “”).strip(), “upload_kb”: _parse_float(r.get(“Upload (KB)”, “0”)), “download_kb”: _parse_float(r.get(“Download (KB)”, “0”)), “volume_kb”: _parse_float(r.get(“Volume (KB)”, “0”)), } ) return rows def filter_telekom_rows( rows: list[dict], start: date, end: date, device_id: str | None = None, app: str | None = None, carrier: str | None = None, roaming: str | None = None, typ: str | None = None, ) -> list[dict]: start_dt = datetime(start.year, start.month, start.day, 0, 0, 0) end_dt_excl = datetime(end.year, end.month, end.day, 0, 0, 0) + timedelta(days=1) device_id = (device_id or “”).strip() or None app = (app or “”).strip().lower() or None carrier = (carrier or “”).strip().lower() or None roaming = (roaming or “”).strip().lower() or None typ = (typ or “”).strip().lower() or None out = [] for r in rows: dt: datetime = r[“dt”] if not (start_dt <= dt < end_dt_excl): continue if device_id and r["device_id"] != device_id: continue if app and app not in (r["application_id"] or "").lower(): continue if carrier and carrier not in (r["carrier"] or "").lower(): continue if roaming and roaming != (r["roaming"] or "").lower(): continue if typ and typ != (r["type"] or "").lower(): continue out.append(r) return out # ============================================================================= # SQL (RAPOR 1) # ============================================================================= BASE_INNER_SQL = """ SELECT ( SELECT MAX(TO_NUMBER(pv.VALUETEXTTUR)) FROM CONTENTCC_C_PROPERTYVALUE pv JOIN CONTENTCC_C_TPROPERTYVALUE tpv ON pv.PROPERTYVALUEID = tpv.PROPERTYVALUEID WHERE pv.PROPERTYID = 19 AND tpv.TITLEID = tl.TITLEID ) AS HEDEFKODU, (pl.NAME || ',' || adr.DISTRICTNAME || ',' || adr.CITYNAME) AS MUSTERI, ppb.DATECREATED, srv.TIMESTARTS AS SERVICETIMESTARTS, ppb.DATEMODIFIED, ( SELECT InvoiceCode FROM ( SELECT i.Code AS InvoiceCode, ROW_NUMBER() OVER (ORDER BY ip.InvoiceID DESC) rn FROM Finance_InvoicePackage ip JOIN Finance_Invoice i ON i.InvoiceID = ip.InvoiceID WHERE ip.TradingPackageID = ppb.PackageID ) WHERE rn = 1 ) AS INVOICECODE, CASE ppb.DELIVERYVALIDATIONTYPE WHEN 1 THEN 'Kart Okutuldu' WHEN 0 THEN 'Kart Okutulmadı' ELSE NULL END AS DELIVERYVALIDATIONTYPE_TEXT, ppb.BARCODE, CASE ppb.SHIPPINGSTATUS WHEN 0 THEN 'Sevk Bekliyor' WHEN 1 THEN 'Sevk Kontrolü Yapıldı' WHEN 2 THEN 'Sevk Başladı' WHEN 3 THEN 'Transfer Edildi' WHEN 4 THEN 'Teslim Edildi' WHEN 5 THEN 'Teslim Edilemedi' WHEN 99 THEN 'İptal Edildi' ELSE NULL END AS SHIPPINGSTATUS_TEXT, CASE srv.ISEXPRESSSERVICE WHEN 1 THEN 'Hızlı Teslimat' WHEN 0 THEN 'Servis' ELSE NULL END AS SERVICETYPE_TEXT, reg.NAMETUR AS PACKAGEREGIONNAMETUR, srv.NAMETUR AS SERVICENAMETUR, wh.NAMETUR AS PACKAGEWAREHOUSENAMETUR, dep.NAMETUR AS DEPARTUREMANAGERNAMETUR, arr.NAMETUR AS ARRIVALMANAGERNAMETUR, drv.NAME AS SERVICEDRIVERNAME, veh.PLATE AS SERVICEVEHICLEPLATE, ( SELECT DeliveredPoint FROM ( SELECT sp.DeliveredPoint, ROW_NUMBER() OVER (ORDER BY sp.DeliveredTime DESC) rn FROM Transportation_ServicePackage sp WHERE sp.PackagePackBarcodeID = ppb.PackagePackBarcodeID AND sp.IsDelivered = 1 ) WHERE rn = 1 ) AS DELIVEREDLOCATION FROM TRS_PACKAGEPACKBARCODE ppb JOIN TRANSPORTATION_PACKAGE pkg ON pkg.PACKAGEID = ppb.PACKAGEID LEFT JOIN TRANSPORTATION_REGION reg ON reg.REGIONID = pkg.REGIONID LEFT JOIN TRANSPORTATION_SERVICE srv ON srv.SERVICEID = ppb.SERVICEID LEFT JOIN TRANSPORTATION_VEHICLE veh ON veh.VEHICLEID = srv.VEHICLEID LEFT JOIN COLLABORATION_NODE wh ON wh.NODEID = pkg.WAREHOUSEID LEFT JOIN COLLABORATION_NODE dep ON dep.NODEID = ppb.DEPARTUREMANAGERID LEFT JOIN COLLABORATION_NODE arr ON arr.NODEID = ppb.ARRIVALMANAGERID LEFT JOIN COLLABORATION_EMPLOYEE drv ON drv.EMPLOYEEID = srv.DRIVERID LEFT JOIN TRADING_LINK pl ON pl.LINKID = pkg.LINKID INNER JOIN TRADING_LINK tl ON tl.LINKID = pkg.LINKID LEFT JOIN CONTACTCC_CONTACT c ON c.CONTACTID = pl.CONTACTID LEFT JOIN CONTACTCC_ADDRESS adr ON adr.ADDRESSID = c.DEFAULTADDRESSID WHERE pl.NAME NOT LIKE '%All%' AND reg.NAMETUR IS NOT NULL AND reg.NAMETUR <> ‘NULL’ AND ppb.DATECREATED >= :start_date AND ppb.DATECREATED < :end_date """ def filtered_outer_sql(with_order: bool) -> str: where = “”” WHERE 1=1 AND (:barcode IS NULL OR LOWER(t.barcode) LIKE LOWER(:barcode_like)) AND (:invoice IS NULL OR NVL(LOWER(t.invoicecode),”) = LOWER(:invoice)) AND (:status IS NULL OR t.shippingstatus_text = :status) AND (:driver IS NULL OR LOWER(NVL(t.servicedrivername,”)) LIKE LOWER(:driver_like)) AND (:region IS NULL OR LOWER(NVL(t.packageregionnametur,”)) LIKE LOWER(:region_like)) AND (:warehouse IS NULL OR LOWER(NVL(t.packagewarehousenametur,”)) LIKE LOWER(:warehouse_like)) AND (:departure IS NULL OR LOWER(NVL(t.departuremanagernametur,”)) LIKE LOWER(:departure_like)) AND (:card IS NULL OR t.deliveryvalidationtype_text = :card) AND (:service IS NULL OR LOWER(NVL(t.servicenametur,”)) LIKE LOWER(:service_like)) “”” order = ” ORDER BY t.datecreated DESC ” if with_order else “” return f””” SELECT * FROM ( {BASE_INNER_SQL} ) t {where} {order} “”” # ========================= # app/main.py (UPDATED) – PART 2/2 # Paste this after PART 1/2 (continue from count_sql() down to end of file) # ========================= def count_sql() -> str: return f”SELECT COUNT(1) AS total FROM ( {filtered_outer_sql(with_order=False)} )” def paged_sql() -> str: return f””” SELECT * FROM ( {filtered_outer_sql(with_order=True)} ) OFFSET :offset ROWS FETCH NEXT :limit ROWS ONLY “”” def build_params( start: date, end: date, barcode: str | None, invoice: str | None, status: str | None, driver: str | None, region: str | None, warehouse: str | None, departure: str | None, card: str | None, service: str | None, ) -> dict: start_dt = _date_start_dt(start) end_dt = _date_end_dt_exclusive(end) barcode = (barcode or “”).strip() or None invoice = (invoice or “”).strip() or None status = (status or “”).strip() or None driver = (driver or “”).strip() or None region = (region or “”).strip() or None warehouse = (warehouse or “”).strip() or None departure = (departure or “”).strip() or None card = (card or “”).strip() or None service = (service or “”).strip() or None return { “start_date”: start_dt, “end_date”: end_dt, “barcode”: barcode, “barcode_like”: f”{barcode}%” if barcode else None, “invoice”: invoice, “status”: status, “driver”: driver, “driver_like”: f”%{driver}%” if driver else None, “region”: region, “region_like”: f”%{region}%” if region else None, “warehouse”: warehouse, “warehouse_like”: f”%{warehouse}%” if warehouse else None, “departure”: departure, “departure_like”: f”%{departure}%” if departure else None, “card”: card, “service”: service, “service_like”: f”%{service}%” if service else None, } # ============================================================================= # PAGES # ============================================================================= @app.get(“/”, include_in_schema=False) def root(): return RedirectResponse(url=”/rapor”) @app.get(“/rapor”, response_class=HTMLResponse) def rapor_page(request: Request): return templates.TemplateResponse(“rapor.html”, {**base_context(request)}) @app.get(“/rapor2”, response_class=HTMLResponse) def rapor2_page(request: Request): return templates.TemplateResponse(“rapor2.html”, {**base_context(request)}) @app.get(“/rapor_pesin”, response_class=HTMLResponse) def rapor_pesin_page(request: Request): return templates.TemplateResponse(“rapor_pesin.html”, {**base_context(request)}) @app.get(“/musteriler”, response_class=HTMLResponse) def musteriler_page(request: Request): return templates.TemplateResponse(“musteriler.html”, {**base_context(request)}) # ============================================================================= # TELEKOM PAGE + API ✅ NEW # ============================================================================= @app.get(“/telekom”, response_class=HTMLResponse) def telekom_page( request: Request, start: date | None = None, end: date | None = None, ): # default: first day of current month -> today-1 today = date.today() if start is None: start = date(today.year, today.month, 1) if end is None: end = today – timedelta(days=1) q = {“start”: str(start), “end”: str(end)} return templates.TemplateResponse( “telekom.html”, {**base_context(request), “q”: q, “csv_path”: str(TELEKOM_CSV_PATH)}, ) @app.get(“/api/telekom”) def telekom_api( start: date | None = None, end: date | None = None, device_id: str | None = None, app: str | None = None, carrier: str | None = None, roaming: str | None = None, typ: str | None = None, page: int = 1, page_size: int = 50, ): try: today = date.today() if start is None: start = date(today.year, today.month, 1) if end is None: end = today – timedelta(days=1) all_rows = read_telekom_csv_rows() rows = filter_telekom_rows( all_rows, start=start, end=end, device_id=device_id, app=app, carrier=carrier, roaming=roaming, typ=typ, ) total = len(rows) toplam_upload = sum(r[“upload_kb”] for r in rows) toplam_download = sum(r[“download_kb”] for r in rows) toplam_volume = sum(r[“volume_kb”] for r in rows) cihaz_sayisi = len({r[“device_id”] for r in rows if r[“device_id”]}) uygulama_sayisi = len({r[“application_id”] for r in rows if r[“application_id”]}) page = max(1, page) page_size = min(max(1, page_size), 500) start_i = (page – 1) * page_size end_i = start_i + page_size page_rows = rows[start_i:end_i] out_rows = [] for r in page_rows: out_rows.append( { “date_time”: r[“dt”].strftime(“%Y-%m-%d %H:%M:%S”), “device_name”: r[“device_name”], “device_id”: r[“device_id”], “application_id”: r[“application_id”], “carrier”: r[“carrier”], “type”: r[“type”], “roaming”: r[“roaming”], “upload_kb”: round(r[“upload_kb”], 2), “download_kb”: round(r[“download_kb”], 2), “volume_kb”: round(r[“volume_kb”], 2), } ) return { “range”: {“start”: str(start), “end”: str(end)}, “csv”: { “path”: str(TELEKOM_CSV_PATH), “exists”: TELEKOM_CSV_PATH.exists(), “read_rows”: len(all_rows), }, “page”: page, “page_size”: page_size, “total”: total, “kpi”: { “toplam_upload_kb”: round(toplam_upload, 2), “toplam_download_kb”: round(toplam_download, 2), “toplam_volume_kb”: round(toplam_volume, 2), “kayit_sayisi”: total, “cihaz_sayisi”: cihaz_sayisi, “uygulama_sayisi”: uygulama_sayisi, }, “rows”: out_rows, } except Exception as e: return JSONResponse(status_code=500, content={“error”: str(e)}) # ============================================================================= # REALIZE PAGE # ============================================================================= @app.get(“/realize”, response_class=HTMLResponse) def realize_page( request: Request, start: date = Query(default_factory=date.today), end: date = Query(default_factory=date.today), seller: str | None = None, group: str | None = None, status: str | None = None, ): q = { “start”: str(start) if start else “”, “end”: str(end) if end else “”, “seller”: (seller or “”), “group”: (group or “”), “status”: (status or “”), } rows_grouped = [] totals = { “ilac_hedef”: 0, “ilac_satis”: 0, “ilac_rea”: 0, “ilacdisi_hedef”: 0, “ilacdisi_satis”: 0, “ilacdisi_rea”: 0, “depo_hedef”: 0, “depo_satis”: 0, “depo_rea”: 0, “ilac_adet”: 0, “ilacdisi_adet”: 0, } groups = [] return templates.TemplateResponse( “realize.html”, { **base_context(request), “q”: q, “rows_grouped”: rows_grouped, “totals”: totals, “groups”: groups, “last_updated”: datetime.now().strftime(“%d.%m.%Y %H:%M”), “period_label”: “OCAK”, “total_rows”: 0, “export_url”: “#”, }, ) # ============================================================================= # AUTO LOGIN (GET/POST) + ALIASES # ============================================================================= def _render_auto_login(request: Request, users, error: Optional[str], result: Optional[str]): return templates.TemplateResponse( “auto_login.html”, { **base_context(request), “users”: users, “error”: error, “result”: result, }, ) @app.get(“/auto_login”, response_class=HTMLResponse) @app.get(“/auto-login”, response_class=HTMLResponse) def auto_login_page(request: Request): excel_path = get_excel_path() users = load_users_from_excel(excel_path) error = None if not excel_path.exists(): error = f”Excel not found: {excel_path}” elif len(users) == 0: error = ( “Excel loaded but empty or headers mismatch. Required headers: customer_code, username, password. ” f”File: {excel_path}” ) return _render_auto_login(request, users, error, None) @app.post(“/auto_login”, response_class=HTMLResponse) @app.post(“/auto-login”, response_class=HTMLResponse) def auto_login_submit( request: Request, target_key: str = Form(…), selected_customer_code: str = Form(…), ): excel_path = get_excel_path() users = load_users_from_excel(excel_path) if target_key not in TARGETS: return _render_auto_login(request, users, “Invalid domain selection.”, None) if not excel_path.exists(): return _render_auto_login(request, [], f”Excel not found: {excel_path}”, None) if not users: return _render_auto_login( request, users, “Excel is empty or column headers do not match. Required: customer_code, username, password”, None, ) chosen = next((u for u in users if u[“customer_code”] == selected_customer_code), None) if not chosen: return _render_auto_login(request, users, “Selected customer_code not found in Excel.”, None) url = TARGETS[target_key] try: run_auto_login( url=url, customer_code=chosen[“customer_code”], username=chosen[“username”], password=chosen[“password”], ) except Exception as e: return _render_auto_login(request, users, f”Auto login failed: {str(e)}”, None) label = chosen[“musteri”].strip() if label: result = f”Auto login triggered: {url} | {label} | {chosen[‘customer_code’]} / {chosen[‘username’]}” else: result = f”Auto login triggered: {url} | {chosen[‘customer_code’]} / {chosen[‘username’]}” return _render_auto_login(request, users, None, result) # ============================================================================= # HEALTH # ============================================================================= @app.get(“/api/health”) def health(): try: row = fetch_one(“SELECT 1 AS ok FROM dual”) return {“db”: “ok”, “ok”: row.get(“ok”, 1) if row else 1} except Exception as e: return JSONResponse(status_code=500, content={“db”: “fail”, “error”: str(e)}) # ============================================================================= # RAPOR 1 API # ============================================================================= @app.get(“/api/rapor”) def rapor_data( start: date = Query(default_factory=date.today), end: date = Query(default_factory=date.today), barcode: str | None = None, invoice: str | None = None, status: str | None = None, driver: str | None = None, region: str | None = None, warehouse: str | None = None, departure: str | None = None, card: str | None = None, service: str | None = None, page: int = 1, page_size: int = 50, ): try: page = max(1, page) page_size = min(max(1, page_size), 500) params = build_params(start, end, barcode, invoice, status, driver, region, warehouse, departure, card, service) total_row = fetch_one(count_sql(), params) total = int(total_row[“total”]) if total_row and total_row.get(“total”) is not None else 0 params_items = dict(params) params_items[“offset”] = (page – 1) * page_size params_items[“limit”] = page_size rows = fetch_all(paged_sql(), params_items) kpi_sql = f””” SELECT shippingstatus_text AS durum, COUNT(1) AS adet FROM ( {filtered_outer_sql(with_order=False)} ) GROUP BY shippingstatus_text ORDER BY COUNT(1) DESC “”” kpi_rows = fetch_all(kpi_sql, params) kpis = [] for r in kpi_rows: adet = int(r.get(“adet”) or 0) yuzde = round((adet / total) * 100, 2) if total else 0 kpis.append({“durum”: r.get(“durum”), “adet”: adet, “yuzde”: yuzde}) return { “range”: {“start”: str(start), “end”: str(end)}, “page”: page, “page_size”: page_size, “total”: total, “kpis”: kpis, “rows”: rows, } except Exception as e: return JSONResponse(status_code=500, content={“error”: str(e)}) @app.get(“/api/rapor/group”) def rapor_group( start: date = Query(default_factory=date.today), end: date = Query(default_factory=date.today), barcode: str | None = None, invoice: str | None = None, status: str | None = None, driver: str | None = None, region: str | None = None, warehouse: str | None = None, departure: str | None = None, card: str | None = None, service: str | None = None, group_by: str = “service”, ): try: params = build_params(start, end, barcode, invoice, status, driver, region, warehouse, departure, card, service) group_map = { “service”: “t.servicenametur”, “customer”: “t.musteri”, “region”: “t.packageregionnametur”, “warehouse”: “t.packagewarehousenametur”, “departure”: “t.departuremanagernametur”, “card”: “t.deliveryvalidationtype_text”, “status”: “t.shippingstatus_text”, “driver”: “t.servicedrivername”, } col = group_map.get(group_by, “t.servicenametur”) sql = f””” SELECT NVL({col}, ‘-‘) AS label, COUNT(1) AS adet FROM ( {filtered_outer_sql(with_order=False)} ) t GROUP BY {col} ORDER BY COUNT(1) DESC “”” items_raw = fetch_all(sql, params) total = sum(int(x.get(“adet”) or 0) for x in items_raw) or 0 items = [] for x in items_raw[:200]: adet = int(x.get(“adet”) or 0) yuzde = round((adet / total) * 100, 2) if total else 0 items.append({“label”: x.get(“label”), “adet”: adet, “yuzde”: yuzde}) return {“group_by”: group_by, “total”: total, “items”: items} except Exception as e: return JSONResponse(status_code=500, content={“error”: str(e)}) # ============================================================================= # TREND KPI # ============================================================================= @app.get(“/api/rapor/trend-kpi”) def rapor_trend_kpi( months: int = Query(default=6, ge=1, le=24), start: date | None = None, end: date | None = None, ): try: if start and end: use_start = _date_start_dt(start) use_end = _date_end_dt_exclusive(end) else: now = datetime.now() cur_month = _first_day_of_month(now) start_month = _add_months(cur_month, -(months – 1)) end_month_excl = _add_months(cur_month, 1) use_start = start_month use_end = end_month_excl sql = “”” SELECT TO_CHAR(TRUNC(ppb.DATECREATED, ‘MM’), ‘YYYY-MM’) AS ay, SUM(CASE WHEN ppb.SHIPPINGSTATUS = 4 THEN 1 ELSE 0 END) AS teslim_edildi, SUM(CASE WHEN ppb.SHIPPINGSTATUS = 0 THEN 1 ELSE 0 END) AS sevk_bekliyor, SUM(CASE WHEN ppb.DELIVERYVALIDATIONTYPE = 0 THEN 1 ELSE 0 END) AS kart_okutulmadi FROM TRS_PACKAGEPACKBARCODE ppb WHERE ppb.DATECREATED >= :start_dt AND ppb.DATECREATED < :end_dt GROUP BY TRUNC(ppb.DATECREATED, 'MM') ORDER BY TRUNC(ppb.DATECREATED, 'MM') """ rows = fetch_all(sql, {"start_dt": use_start, "end_dt": use_end}) def pct(curr: int, prev: int): if prev == 0: return None return round(((curr - prev) / prev) * 100, 2) out = [] prev = None for r in rows: cur = { "ay": r.get("ay"), "teslim_edildi": int(r.get("teslim_edildi") or 0), "sevk_bekliyor": int(r.get("sevk_bekliyor") or 0), "kart_okutulmadi": int(r.get("kart_okutulmadi") or 0), } if prev: cur["degisim"] = { "teslim_edildi_pct": pct(cur["teslim_edildi"], prev["teslim_edildi"]), "sevk_bekliyor_pct": pct(cur["sevk_bekliyor"], prev["sevk_bekliyor"]), "kart_okutulmadi_pct": pct(cur["kart_okutulmadi"], prev["kart_okutulmadi"]), } else: cur["degisim"] = { "teslim_edildi_pct": None, "sevk_bekliyor_pct": None, "kart_okutulmadi_pct": None, "kart_okutulmadi_pct": None, } out.append(cur) prev = cur return {"months": out} except Exception as e: return JSONResponse(status_code=500, content={"error": str(e)}) @app.get("/api/trend-kpi") def trend_kpi_alias( months: int = Query(default=6, ge=1, le=24), start: date | None = None, end: date | None = None, ): return rapor_trend_kpi(months=months, start=start, end=end) # ============================================================================= # EXCEL EXPORT (RAPOR 1) # ============================================================================= def _make_xlsx(headers: list[str], rows: list[dict], sheet_name: str = "Rapor") -> bytes: wb = Workbook() ws = wb.active ws.title = sheet_name ws.append(headers) for r in rows: ws.append([r.get(h) for h in headers]) for i, h in enumerate(headers, start=1): max_len = len(h) for rr in rows[:300]: val = rr.get(h) if val is None: continue s = str(val) if len(s) > max_len: max_len = len(s) ws.column_dimensions[get_column_letter(i)].width = max(12, min(48, max_len + 2)) import io bio = io.BytesIO() wb.save(bio) return bio.getvalue() @app.get(“/api/rapor/export.xlsx”) def export_xlsx( start: date = Query(default_factory=date.today), end: date = Query(default_factory=date.today), barcode: str | None = None, invoice: str | None = None, status: str | None = None, driver: str | None = None, region: str | None = None, warehouse: str | None = None, departure: str | None = None, card: str | None = None, service: str | None = None, ): try: params = build_params(start, end, barcode, invoice, status, driver, region, warehouse, departure, card, service) sql = filtered_outer_sql(with_order=True) rows = fetch_all(sql, params) headers = [ “datecreated”, “hedefkodu”, “musteri”, “barcode”, “invoicecode”, “shippingstatus_text”, “servicetype_text”, “packageregionnametur”, “servicenametur”, “packagewarehousenametur”, “departuremanagernametur”, “arrivalmanagernametur”, “servicedrivername”, “servicevehicleplate”, “deliveredlocation”, “deliveryvalidationtype_text”, “servicetimestarts”, “datemodified”, ] data = _make_xlsx(headers, rows, sheet_name=”Sevkiyat”) filename = f”sevkiyat_{start}_{end}.xlsx” return StreamingResponse( iter([data]), media_type=”application/vnd.openxmlformats-officedocument.spreadsheetml.sheet”, headers={“Content-Disposition”: f’attachment; filename=”{filename}”‘}, ) except Exception as e: return JSONResponse(status_code=500, content={“error”: str(e)}) @app.get(“/api/export.xlsx”) def export_xlsx_alias( start: date = Query(default_factory=date.today), end: date = Query(default_factory=date.today), barcode: str | None = None, invoice: str | None = None, status: str | None = None, driver: str | None = None, region: str | None = None, warehouse: str | None = None, departure: str | None = None, card: str | None = None, service: str | None = None, ): return export_xlsx( start=start, end=end, barcode=barcode, invoice=invoice, status=status, driver=driver, region=region, warehouse=warehouse, departure=departure, card=card, service=service, ) read_telekom_csv_rows() def read_telekom_csv_rows() -> list[dict]: if not TELEKOM_CSV_PATH.exists(): return [] # CSV’yi otomatik ayraç/dialect ile oku + header/field normalize et rows: list[dict] = [] with open(TELEKOM_CSV_PATH, “r”, encoding=”utf-8-sig”, newline=””) as f: sample = f.read(4096) f.seek(0) try: dialect = csv.Sniffer().sniff(sample, delimiters=[“,”, “;”, “\t”, “|”]) except Exception: dialect = csv.excel # fallback: comma reader = csv.DictReader(f, dialect=dialect) # header’ları kırp (boşluk/BOM vs) if reader.fieldnames: reader.fieldnames = [str(x or “”).strip() for x in reader.fieldnames] # Esnek başlık eşlemesi def pick(d: dict, *keys: str) -> str: # 1) birebir for k in keys: if k in d: return d.get(k) or “” # 2) trim/low ile low = {str(k).strip().lower(): k for k in d.keys()} for k in keys: kk = k.strip().lower() if kk in low: return d.get(low[kk]) or “” return “” for r in reader: if not r: continue dt_raw = pick(r, “Date & Time”, “Date&Time”, “Date Time”, “DateTime”) dt = _parse_dt_telekom(dt_raw) if not dt: continue rows.append( { “dt”: dt, “device_name”: pick(r, “Device Name”).strip(), “device_id”: pick(r, “Device ID”).strip(), “application_id”: pick(r, “Application ID”).strip(), “carrier”: pick(r, “Carrier”).strip(), “type”: pick(r, “Type”).strip(), “roaming”: pick(r, “Roaming”).strip(), “upload_kb”: _parse_float(pick(r, “Upload (KB)”, “Upload(KB)”)), “download_kb”: _parse_float(pick(r, “Download (KB)”, “Download(KB)”)), “volume_kb”: _parse_float(pick(r, “Volume (KB)”, “Volume(KB)”)), } ) return rows def _parse_dt_telekom(s: str) -> datetime | None: if not s: return None t = ” “.join(str(s).strip().replace(“\u00a0″, ” “).split()) # non-breaking space fix fmts = [ “%m/%d/%y %I:%M:%S %p”, # 2/6/26 3:33:11 PM “%m/%d/%Y %I:%M:%S %p”, “%d.%m.%Y %H:%M:%S”, # 06.02.2026 15:33:11 “%d.%m.%y %H:%M:%S”, “%Y-%m-%d %H:%M:%S”, ] for f in fmts: try: return datetime.strptime(t, f) except Exception: pass return None