#!/usr/bin/env python3
"""
KT Scraper v8.0 — полный сбор данных с листинга
Использует contexts[0] — браузер с живой сессией Qrator.
ПОЛЯ на выходе:
oid — ID товара euroauto
name_full — "Топливный насос Auger 71593"
attr — "нижний, d=55мм" (признак/описание)
brand — производитель детали
price — цена
href — ссылка на товар
image_url — картинка (из HTML или строим из oid)
country — страна производства
parts_info — текст совместимости из карточки
category — категория детали
truck_brand — марка грузовика (из URL)
truck_model — модель (из URL)
truck_mod — модификация (из URL)
source — исходный URL страницы
ПАРАМЕТРЫ:
--tabs N Число вкладок (default: 3)
--delay N Задержка между запросами (default: 1.0)
--resume Продолжить с последней позиции
--no-cache Не использовать HTML-кеш
--test N Скрапить только первые N URL
--port N CDP порт браузера (default: 9223)
"""
import argparse
import asyncio
import hashlib
import html as html_lib
import json
import re
import sys
from datetime import datetime
from pathlib import Path
BASE_DIR = Path.home() / "SERVER/dataspace"
DL_DIR = BASE_DIR / "dl"
UP_DIR = BASE_DIR / "up"
CACHE_DIR = BASE_DIR / "cache" / "euroauto"
URLS_FILE = DL_DIR / "rescrape_urls.txt"
OUTPUT_FILE = UP_DIR / f"scraped_parts_{datetime.now().strftime('%Y%m%d_%H%M%S')}.jsonl"
PROGRESS_FILE = UP_DIR / "scraper_progress.txt"
PRODUCT_RE = re.compile(r'data-datalayer-product="({[^"]+})"')
DESC_RE = re.compile(r'itemprop="description"\s+content="([^"]+)"')
ATTR_RE = re.compile(r'itemprop="name"\s+content="([^"]+)"')
IMG_RE = re.compile(r'itemprop="image"\s[^>]+src="([^"]+)"')
COUNTRY_RE = re.compile(r'card__param country[^>]+>.*?card__param-value">\s*([^<\s][^<]*)', re.DOTALL)
INFO_RE = re.compile(r'card__parts-info"[^>]+title="[^"]+">([^<]+)')
SOURCE_RE = re.compile(r'/catalog/([^/]+)/brand-([^/?]+)/(?:model-([^/?]+)/)?(?:modification-([^/?]+)/)?')
parser = argparse.ArgumentParser()
parser.add_argument("--tabs", type=int, default=3)
parser.add_argument("--delay", type=float, default=1.0)
parser.add_argument("--resume", action="store_true")
parser.add_argument("--no-cache", action="store_true")
parser.add_argument("--test", type=int, default=0)
parser.add_argument("--port", type=int, default=9223)
args = parser.parse_args()
total_found = 0
total_done = 0
done_offset = 0
load_times = []
def url_cache_path(url: str) -> Path:
h = hashlib.md5(url.encode()).hexdigest()
return CACHE_DIR / h[:2] / f"{h}.html"
def parse_source_url(url: str) -> dict:
m = SOURCE_RE.search(url)
if m:
return {
"category": m.group(1) or "",
"truck_brand": m.group(2) or "",
"truck_model": m.group(3) or "",
"truck_mod": (m.group(4) or "").rstrip("-").replace("-", "_"),
}
return {"category": "", "truck_brand": "", "truck_model": "", "truck_mod": ""}
def parse_products(html: str, source_url: str) -> list:
src = parse_source_url(source_url)
raw_list = PRODUCT_RE.findall(html)
descs = DESC_RE.findall(html)
attrs = ATTR_RE.findall(html)
imgs = IMG_RE.findall(html)
countries = COUNTRY_RE.findall(html)
infos = INFO_RE.findall(html)
products = []
for i, raw in enumerate(raw_list):
try:
data = json.loads(html_lib.unescape(raw))
oid = data.get("original_id")
if not oid:
continue
# Изображение: берём из HTML если не заглушка, иначе строим из oid
img_html = imgs[i] if i < len(imgs) else ""
if img_html and "no-photo" not in img_html:
image_url = img_html
else:
image_url = f"https://file.euroauto.ru/v2/file/parts/new/{oid}/1.jpg"
products.append({
"oid": str(oid),
"name_full": descs[i].strip() if i < len(descs) else data.get("name", ""),
"attr": attrs[i].strip() if i < len(attrs) else "",
"brand": data.get("brand", ""),
"price": str(data.get("price", "")),
"href": f"https://euroauto.ru/part/new/{oid}/",
"image_url": image_url,
"country": countries[i].strip() if i < len(countries) else "",
"parts_info": infos[i].strip() if i < len(infos) else "",
"category": data.get("category", "") or src["category"],
"truck_brand": src["truck_brand"],
"truck_model": src["truck_model"],
"truck_mod": src["truck_mod"],
"source": source_url,
})
except Exception:
continue
return products
async def scrape_page(page, url: str) -> list:
cp = url_cache_path(url)
if not args.no_cache and cp.exists():
html = cp.read_text(errors="replace")
else:
t0 = asyncio.get_event_loop().time()
try:
await page.goto(url, wait_until="domcontentloaded", timeout=20000)
try:
await page.wait_for_selector('a[href*="/part/"]', timeout=10000)
except Exception:
pass
load_times.append(asyncio.get_event_loop().time() - t0)
html = ""
for _ in range(4):
try:
html = await page.content()
break
except Exception:
await page.wait_for_timeout(1000)
if not html:
print(f" SKIP {url[:70]}", flush=True)
return []
if len(html) < 3000:
print(f" BLOCK {url[:70]} ({len(html)}b)", flush=True)
for _ in range(4):
await page.wait_for_timeout(2000)
try:
html = await page.content()
if len(html) > 3000:
break
except Exception:
pass
except Exception as e:
print(f" ERR {url[:70]}: {e}", flush=True)
return []
if not args.no_cache and len(html) > 3000:
cp.parent.mkdir(parents=True, exist_ok=True)
cp.write_text(html, errors="replace")
products = parse_products(html, url)
await asyncio.sleep(args.delay)
return products
async def worker(tab_id: int, page, queue: asyncio.Queue, out_f):
global total_found, total_done
while True:
try:
url = queue.get_nowait()
except asyncio.QueueEmpty:
break
products = await scrape_page(page, url)
for p in products:
out_f.write(json.dumps(p, ensure_ascii=False) + "\n")
total_found += 1
total_done += 1
PROGRESS_FILE.write_text(f"done={done_offset + total_done}")
if total_done % 10 == 0:
out_f.flush()
avg = sum(load_times) / len(load_times) if load_times else 0
mb = sum(f.stat().st_size for f in CACHE_DIR.rglob("*.html")) // 1024 // 1024
print(f"[{done_offset+total_done}] найдено: {total_found} | avg: {avg:.1f}с | кеш: {mb}MB",
flush=True)
async def main():
global done_offset
from playwright.async_api import async_playwright
CACHE_DIR.mkdir(parents=True, exist_ok=True)
UP_DIR.mkdir(parents=True, exist_ok=True)
all_urls = [u.strip() for u in URLS_FILE.read_text().splitlines() if u.strip()]
if args.test:
all_urls = all_urls[:args.test]
print(f"ТЕСТ: {args.test} URL", flush=True)
if args.resume and PROGRESS_FILE.exists():
try:
line = PROGRESS_FILE.read_text().strip()
if line.startswith("done="):
done_offset = int(line.split("=")[1])
print(f"Resume с {done_offset}/{len(all_urls)}", flush=True)
except Exception:
pass
urls = all_urls[done_offset:]
print(f"Осталось: {len(urls)} URL | {args.tabs} вкладок | delay={args.delay}s", flush=True)
async with async_playwright() as p:
browser = await p.chromium.connect_over_cdp(f"http://localhost:{args.port}")
if not browser.contexts:
print("ERROR: нет контекстов в браузере", flush=True)
sys.exit(1)
ctx = browser.contexts[0]
cookies = await ctx.cookies()
ea = [c for c in cookies if "euroauto" in c.get("domain", "")]
print(f"Кук euroauto: {len(ea)} | {[c['name'] for c in ea[:5]]}", flush=True)
pages = [await ctx.new_page() for _ in range(args.tabs)]
print(f"Открыто {args.tabs} вкладок в существующем контексте", flush=True)
queue: asyncio.Queue = asyncio.Queue()
for url in urls:
await queue.put(url)
out_f = open(OUTPUT_FILE, "a")
await asyncio.gather(*[worker(t, pages[t], queue, out_f) for t in range(args.tabs)])
for pg in pages:
await pg.close()
out_f.flush()
out_f.close()
print(f"\nГотово! {total_found} записей → {OUTPUT_FILE}", flush=True)
asyncio.run(main())