erster commit

This commit is contained in:
2026-05-18 15:37:35 +02:00
commit 6927b5c457
3 changed files with 688 additions and 0 deletions

30
.vscode/tasks.json vendored Normal file
View File

@@ -0,0 +1,30 @@
{
"version": "2.0.0",
"tasks": [
{
"label": "Build Grabber Config",
"type": "shell",
"command": "${workspaceFolder}/.venv/Scripts/python.exe",
"args": [
"-m", "nuitka",
"--standalone",
"--enable-plugin=multiprocessing",
"--enable-plugin=anti-bloat",
"--enable-plugin=tk-inter",
"--lto=no",
"--msvc=latest",
"--output-dir=build",
"--assume-yes-for-downloads",
"--windows-company-name=Patrick Gniza",
"--windows-product-name=Grabber Config",
"--windows-file-version=1.0.0",
"--windows-product-version=1.0.0",
"--windows-file-description=Grabber Config",
"--windows-icon-from-ico=GrabberConfig.ico",
"GrabberConfig.py"
],
"group": "build",
"problemMatcher": []
}
]
}

BIN
GrabberConfig.ico Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 78 KiB

658
GrabberConfig.py Normal file
View File

@@ -0,0 +1,658 @@
import argparse
import json
import os
import sys
import tkinter as tk
from tkinter import ttk, messagebox
from tkinter.scrolledtext import ScrolledText
import comtypes
import comtypes.client
from comtypes import GUID, COMMETHOD, HRESULT
from comtypes.automation import IUnknown, VARIANT
from ctypes import c_ulong, c_long, POINTER, byref
PROGRAMDATA_DIR = os.path.join(
os.environ.get("PROGRAMDATA", r"C:\ProgramData"),
"GrabberConfig"
)
CONFIG_FILE = os.path.join(PROGRAMDATA_DIR, "config.json")
def ensure_config_dir():
os.makedirs(PROGRAMDATA_DIR, exist_ok=True)
CLSID_SystemDeviceEnum = GUID("{62BE5D10-60EB-11d0-BD3B-00A0C911CE86}")
CLSID_VideoInputDeviceCategory = GUID("{860BB310-5D01-11d0-BD3B-00A0C911CE86}")
CLSID_AM_KSCATEGORY_CROSSBAR = GUID("{A799A801-A46D-11D0-A18C-00A02401DCD4}")
IID_IBaseFilter = GUID("{56A86895-0AD4-11CE-B03A-0020AF0BA770}")
PHYS_CONN_NAMES = {
1: "Video Tuner",
2: "Composite",
3: "S-Video",
4: "RGB",
5: "YRYBY",
6: "Serial Digital",
7: "Parallel Digital",
8: "SCSI",
9: "AUX",
10: "1394",
11: "USB",
12: "Video Decoder",
4096: "Audio Tuner",
4097: "Audio Line",
4098: "Audio Mic",
4099: "AES Digital",
4100: "SPDIF Digital",
4101: "SCSI Audio",
4102: "AUX Audio",
4103: "1394 Audio",
4104: "USB Audio",
4105: "Audio Decoder",
}
VIDEO_STANDARD_NAMES = {
0x00000001: "NTSC_M",
0x00000002: "NTSC_M_J",
0x00000004: "NTSC_433",
0x00000010: "PAL_B",
0x00000020: "PAL_D",
0x00000040: "PAL_G",
0x00000080: "PAL_H",
0x00000100: "PAL_I",
0x00000200: "PAL_M",
0x00000400: "PAL_N",
0x00000800: "PAL_60",
0x00001000: "SECAM_B",
0x00002000: "SECAM_D",
0x00004000: "SECAM_G",
0x00008000: "SECAM_H",
0x00010000: "SECAM_K",
0x00020000: "SECAM_K1",
0x00040000: "SECAM_L",
0x00080000: "SECAM_L1",
0x00100000: "PAL_N_COMBO",
}
class IMoniker(IUnknown):
_iid_ = GUID("{0000000F-0000-0000-C000-000000000046}")
_methods_ = [
COMMETHOD([], HRESULT, "GetClassID", (["out"], POINTER(GUID), "pClassID")),
COMMETHOD([], HRESULT, "IsDirty"),
COMMETHOD([], HRESULT, "Load", (["in"], POINTER(IUnknown), "pStm")),
COMMETHOD([], HRESULT, "Save", (["in"], POINTER(IUnknown), "pStm"), (["in"], c_long, "fClearDirty")),
COMMETHOD([], HRESULT, "GetSizeMax", (["out"], POINTER(c_ulong), "pcbSize")),
COMMETHOD([], HRESULT, "BindToObject", (["in"], POINTER(IUnknown), "pbc"), (["in"], POINTER(IUnknown), "pmkToLeft"), (["in"], POINTER(GUID), "riidResult"), (["out"], POINTER(POINTER(IUnknown)), "ppvResult")),
COMMETHOD([], HRESULT, "BindToStorage", (["in"], POINTER(IUnknown), "pbc"), (["in"], POINTER(IUnknown), "pmkToLeft"), (["in"], POINTER(GUID), "riid"), (["out"], POINTER(POINTER(IUnknown)), "ppvObj")),
]
class IEnumMoniker(IUnknown):
_iid_ = GUID("{00000102-0000-0000-C000-000000000046}")
_methods_ = [
COMMETHOD([], HRESULT, "Next", (["in"], c_ulong, "celt"), (["out"], POINTER(POINTER(IMoniker)), "rgelt"), (["out"], POINTER(c_ulong), "pceltFetched")),
]
class ICreateDevEnum(IUnknown):
_iid_ = GUID("{29840822-5B84-11D0-BD3B-00A0C911CE86}")
_methods_ = [
COMMETHOD([], HRESULT, "CreateClassEnumerator", (["in"], POINTER(GUID), "clsidDeviceClass"), (["out"], POINTER(POINTER(IEnumMoniker)), "ppEnumMoniker"), (["in"], c_ulong, "dwFlags")),
]
class IPropertyBag(IUnknown):
_iid_ = GUID("{55272A00-42CB-11CE-8135-00AA004BB851}")
_methods_ = [
COMMETHOD([], HRESULT, "Read", (["in"], comtypes.c_wchar_p, "pszPropName"), (["out"], POINTER(VARIANT), "pVar"), (["in"], POINTER(IUnknown), "pErrorLog")),
]
class IAMAnalogVideoDecoder(IUnknown):
_iid_ = GUID("{C6E13350-30AC-11d0-A18C-00A0C9118956}")
_methods_ = [
COMMETHOD([], HRESULT, "get_AvailableTVFormats", (["out"], POINTER(c_long), "lAnalogVideoStandard")),
COMMETHOD([], HRESULT, "put_TVFormat", (["in"], c_long, "lAnalogVideoStandard")),
COMMETHOD([], HRESULT, "get_TVFormat", (["out"], POINTER(c_long), "plAnalogVideoStandard")),
]
class IAMCrossbar(IUnknown):
_iid_ = GUID("{C6E13380-30AC-11d0-A18C-00A0C9118956}")
_methods_ = [
COMMETHOD([], HRESULT, "get_PinCounts", (["out"], POINTER(c_long), "OutputPinCount"), (["out"], POINTER(c_long), "InputPinCount")),
COMMETHOD([], HRESULT, "CanRoute", (["in"], c_long, "OutputPinIndex"), (["in"], c_long, "InputPinIndex")),
COMMETHOD([], HRESULT, "Route", (["in"], c_long, "OutputPinIndex"), (["in"], c_long, "InputPinIndex")),
COMMETHOD([], HRESULT, "get_IsRoutedTo", (["in"], c_long, "OutputPinIndex"), (["out"], POINTER(c_long), "InputPinIndex")),
COMMETHOD([], HRESULT, "get_CrossbarPinInfo", (["in"], c_long, "IsInputPin"), (["in"], c_long, "PinIndex"), (["out"], POINTER(c_long), "PinIndexRelated"), (["out"], POINTER(c_long), "PhysicalType")),
]
def create_dev_enum():
return comtypes.client.CreateObject(CLSID_SystemDeviceEnum, interface=ICreateDevEnum)
def list_filters(category_guid):
dev_enum = create_dev_enum()
enum_moniker = dev_enum.CreateClassEnumerator(byref(category_guid), 0)
if not enum_moniker:
return []
result = []
invalid_count = 0
max_invalid = 10
max_total = 100
total = 0
while total < max_total:
total += 1
monikers = enum_moniker.Next(1)
if not monikers:
break
moniker = monikers[0]
if not moniker:
break
try:
bag_unknown = moniker.BindToStorage(None, None, byref(IPropertyBag._iid_))
if not bag_unknown:
invalid_count += 1
if invalid_count >= max_invalid:
break
continue
bag = bag_unknown.QueryInterface(IPropertyBag)
name = bag.Read("FriendlyName", None)
if name:
result.append({"name": name, "moniker": moniker})
except Exception:
invalid_count += 1
if invalid_count >= max_invalid:
break
continue
return result
def bind_filter(moniker):
return moniker.BindToObject(None, None, byref(IID_IBaseFilter))
def get_filter_by_name(category_guid, target_name):
for item in list_filters(category_guid):
if item["name"] == target_name:
return bind_filter(item["moniker"])
raise RuntimeError(f"Filter nicht gefunden: {target_name}")
def decode_video_standards(mask):
return [(value, name) for value, name in VIDEO_STANDARD_NAMES.items() if mask & value]
def get_decoder(video_filter):
return video_filter.QueryInterface(IAMAnalogVideoDecoder)
def get_crossbar(crossbar_filter):
return crossbar_filter.QueryInterface(IAMCrossbar)
def inspect_crossbar(crossbar):
output_count, input_count = crossbar.get_PinCounts()
inputs = []
outputs = []
for i in range(input_count):
related, physical_type = crossbar.get_CrossbarPinInfo(1, i)
inputs.append({
"index": i,
"related": related,
"physical_type": physical_type,
"name": PHYS_CONN_NAMES.get(physical_type, f"Unknown {physical_type}"),
})
for o in range(output_count):
related, physical_type = crossbar.get_CrossbarPinInfo(0, o)
outputs.append({
"index": o,
"related": related,
"physical_type": physical_type,
"name": PHYS_CONN_NAMES.get(physical_type, f"Unknown {physical_type}"),
})
return inputs, outputs
def find_index_by_name(items, name):
for i, item in enumerate(items):
if item.get("name") == name:
return i
return -1
class VideoGrabberGui(tk.Tk):
def __init__(self, config_path, force_config=False):
super().__init__()
self.config_path = config_path
self.force_config = force_config
self.video_devices = []
self.crossbars = []
self.standards = []
self.inputs = []
self.outputs = []
self.title("Video Grabber Konfiguration")
self.geometry("760x560")
self.minsize(700, 500)
self._build_ui()
self.after(200, self.startup)
def _build_ui(self):
main = ttk.Frame(self, padding=12)
main.pack(fill="both", expand=True)
main.columnconfigure(1, weight=1)
ttk.Label(main, text="Videogerät").grid(row=0, column=0, sticky="w", pady=4)
self.video_combo = ttk.Combobox(main, state="readonly")
self.video_combo.grid(row=0, column=1, sticky="ew", pady=4)
self.video_combo.bind("<<ComboboxSelected>>", lambda e: self.on_video_selected())
ttk.Button(main, text="Aktualisieren", command=self.refresh_devices).grid(row=0, column=2, padx=(8, 0), pady=4)
ttk.Label(main, text="Videostandard").grid(row=1, column=0, sticky="w", pady=4)
self.standard_combo = ttk.Combobox(main, state="readonly")
self.standard_combo.grid(row=1, column=1, sticky="ew", pady=4)
ttk.Label(main, text="Crossbar").grid(row=2, column=0, sticky="w", pady=4)
self.crossbar_combo = ttk.Combobox(main, state="readonly")
self.crossbar_combo.grid(row=2, column=1, sticky="ew", pady=4)
self.crossbar_combo.bind("<<ComboboxSelected>>", lambda e: self.on_crossbar_selected())
ttk.Label(main, text="Eingang").grid(row=3, column=0, sticky="w", pady=4)
self.input_combo = ttk.Combobox(main, state="readonly")
self.input_combo.grid(row=3, column=1, sticky="ew", pady=4)
ttk.Label(main, text="Ausgang").grid(row=4, column=0, sticky="w", pady=4)
self.output_combo = ttk.Combobox(main, state="readonly")
self.output_combo.grid(row=4, column=1, sticky="ew", pady=4)
buttons = ttk.Frame(main)
buttons.grid(row=5, column=0, columnspan=3, sticky="ew", pady=(12, 8))
ttk.Button(buttons, text="Speichern", command=self.save_config).pack(side="left")
ttk.Button(buttons, text="Anwenden", command=self.apply_current_selection).pack(side="left", padx=8)
ttk.Button(buttons, text="Speichern + Anwenden", command=self.save_and_apply).pack(side="left")
ttk.Button(buttons, text="Beenden", command=self.destroy).pack(side="right")
ttk.Label(main, text="Protokoll").grid(row=6, column=0, columnspan=3, sticky="w", pady=(12, 2))
self.log_box = ScrolledText(main, height=16)
self.log_box.grid(row=7, column=0, columnspan=3, sticky="nsew")
main.rowconfigure(7, weight=1)
def log(self, text):
self.log_box.insert("end", text + "\n")
self.log_box.see("end")
self.update_idletasks()
def startup(self):
try:
self.refresh_devices()
if os.path.exists(self.config_path):
self.load_config_into_ui()
if not self.force_config:
self.log("Konfiguration gefunden. Wende gespeicherte Einstellungen an.")
self.apply_config_file()
else:
self.log("Keine config.json gefunden. Bitte Einstellungen auswählen und speichern.")
except Exception as e:
messagebox.showerror("Fehler", str(e))
self.log(f"Fehler: {e}")
def refresh_devices(self):
self.log("Suche DirectShow-Videogeräte...")
self.video_devices = list_filters(CLSID_VideoInputDeviceCategory)
self.video_combo["values"] = [d["name"] for d in self.video_devices]
self.crossbars = list_filters(CLSID_AM_KSCATEGORY_CROSSBAR)
self.crossbar_combo["values"] = [c["name"] for c in self.crossbars]
self.log(f"Videogeräte: {len(self.video_devices)}")
self.log(f"Crossbars: {len(self.crossbars)}")
if self.video_devices and self.video_combo.current() < 0:
self.video_combo.current(0)
self.on_video_selected()
def on_video_selected(self):
idx = self.video_combo.current()
if idx < 0:
return
device = self.video_devices[idx]
self.log(f"Öffne Videogerät: {device['name']}")
try:
video_filter = bind_filter(device["moniker"])
decoder = get_decoder(video_filter)
available_mask = decoder.get_AvailableTVFormats()
self.standards = decode_video_standards(available_mask)
self.standard_combo["values"] = [f"{name} ({value})" for value, name in self.standards]
default = next((i for i, (_, name) in enumerate(self.standards) if name == "PAL_B"), None)
if default is None:
default = next((i for i, (_, name) in enumerate(self.standards) if name.startswith("PAL")), 0)
if self.standards:
self.standard_combo.current(default)
self.log(f"Verfügbare Videostandards: {available_mask}")
except Exception as e:
self.standards = []
self.standard_combo["values"] = []
self.log(f"Videostandards konnten nicht gelesen werden: {e}")
self.select_likely_crossbar(device["name"])
def select_likely_crossbar(self, device_name):
if not self.crossbars:
return
base_parts = [
p.lower()
for p in device_name.replace("Video Capture", "").replace("Capture", "").split()
if len(p) > 2
]
best_index = 0
best_score = -1
for i, item in enumerate(self.crossbars):
name = item["name"].lower()
score = sum(1 for part in base_parts if part in name)
if score > best_score:
best_index = i
best_score = score
self.crossbar_combo.current(best_index)
self.on_crossbar_selected()
def on_crossbar_selected(self):
idx = self.crossbar_combo.current()
if idx < 0:
return
item = self.crossbars[idx]
self.log(f"Öffne Crossbar: {item['name']}")
try:
crossbar_filter = bind_filter(item["moniker"])
crossbar = get_crossbar(crossbar_filter)
self.inputs, self.outputs = inspect_crossbar(crossbar)
self.input_combo["values"] = [
f"Input {i['index']}: {i['name']} ({i['physical_type']})"
for i in self.inputs
]
self.output_combo["values"] = [
f"Output {o['index']}: {o['name']} ({o['physical_type']})"
for o in self.outputs
]
input_default = next((i for i, item in enumerate(self.inputs) if item["physical_type"] == 3), 0)
output_default = next((i for i, item in enumerate(self.outputs) if item["physical_type"] == 12), 0)
if self.inputs:
self.input_combo.current(input_default)
if self.outputs:
self.output_combo.current(output_default)
except Exception as e:
self.inputs = []
self.outputs = []
self.input_combo["values"] = []
self.output_combo["values"] = []
self.log(f"Crossbar konnte nicht gelesen werden: {e}")
def build_config_from_ui(self):
video_idx = self.video_combo.current()
standard_idx = self.standard_combo.current()
if video_idx < 0:
raise RuntimeError("Kein Videogerät ausgewählt.")
if standard_idx < 0:
raise RuntimeError("Kein Videostandard ausgewählt.")
device_name = self.video_devices[video_idx]["name"]
standard_value, standard_name = self.standards[standard_idx]
cfg = {
"device_name": device_name,
"video_standard": {
"name": standard_name,
"value": standard_value,
},
"crossbar_name": None,
"input": None,
"output": None,
}
crossbar_idx = self.crossbar_combo.current()
input_idx = self.input_combo.current()
output_idx = self.output_combo.current()
if crossbar_idx >= 0 and input_idx >= 0 and output_idx >= 0:
cfg["crossbar_name"] = self.crossbars[crossbar_idx]["name"]
cfg["input"] = self.inputs[input_idx]
cfg["output"] = self.outputs[output_idx]
return cfg
def save_config(self):
try:
cfg = self.build_config_from_ui()
ensure_config_dir()
with open(self.config_path, "w", encoding="utf-8") as f:
json.dump(cfg, f, indent=4, ensure_ascii=False)
self.log(f"Konfiguration gespeichert: {self.config_path}")
messagebox.showinfo("Gespeichert", f"Konfiguration gespeichert:\n{self.config_path}")
except Exception as e:
messagebox.showerror("Fehler", str(e))
self.log(f"Fehler beim Speichern: {e}")
def save_and_apply(self):
self.save_config()
self.apply_config_file()
def apply_current_selection(self):
try:
cfg = self.build_config_from_ui()
self.apply_config(cfg)
except Exception as e:
messagebox.showerror("Fehler", str(e))
self.log(f"Fehler beim Anwenden: {e}")
def apply_config_file(self):
with open(self.config_path, "r", encoding="utf-8") as f:
cfg = json.load(f)
self.apply_config(cfg)
def apply_config(self, cfg):
device_name = cfg["device_name"]
standard_name = cfg["video_standard"]["name"]
standard_value = int(cfg["video_standard"]["value"])
self.log(f"Öffne Videogerät: {device_name}")
video_filter = get_filter_by_name(CLSID_VideoInputDeviceCategory, device_name)
decoder = get_decoder(video_filter)
try:
before = decoder.get_TVFormat()
self.log(f"Vorheriger Videostandard: {before}")
except Exception:
pass
decoder.put_TVFormat(standard_value)
self.log(f"Videostandard gesetzt: {standard_name} ({standard_value})")
try:
after = decoder.get_TVFormat()
self.log(f"Aktueller Videostandard: {after}")
except Exception:
pass
crossbar_name = cfg.get("crossbar_name")
input_cfg = cfg.get("input")
output_cfg = cfg.get("output")
if not crossbar_name or not input_cfg or not output_cfg:
self.log("Keine Crossbar-/Eingangskonfiguration vorhanden.")
return
self.log(f"Öffne Crossbar: {crossbar_name}")
crossbar_filter = get_filter_by_name(CLSID_AM_KSCATEGORY_CROSSBAR, crossbar_name)
crossbar = get_crossbar(crossbar_filter)
output_index = int(output_cfg["index"])
input_index = int(input_cfg["index"])
crossbar.CanRoute(output_index, input_index)
crossbar.Route(output_index, input_index)
self.log(f"Eingang gesetzt: Output {output_index} -> Input {input_index} ({input_cfg.get('name', 'unbekannt')})")
def load_config_into_ui(self):
try:
with open(self.config_path, "r", encoding="utf-8") as f:
cfg = json.load(f)
device_idx = find_index_by_name(self.video_devices, cfg.get("device_name"))
if device_idx >= 0:
self.video_combo.current(device_idx)
self.on_video_selected()
standard_cfg = cfg.get("video_standard", {})
standard_value = int(standard_cfg.get("value", -1))
for i, (value, _) in enumerate(self.standards):
if value == standard_value:
self.standard_combo.current(i)
break
crossbar_idx = find_index_by_name(self.crossbars, cfg.get("crossbar_name"))
if crossbar_idx >= 0:
self.crossbar_combo.current(crossbar_idx)
self.on_crossbar_selected()
input_cfg = cfg.get("input") or {}
output_cfg = cfg.get("output") or {}
for i, item in enumerate(self.inputs):
if item["index"] == input_cfg.get("index"):
self.input_combo.current(i)
break
for i, item in enumerate(self.outputs):
if item["index"] == output_cfg.get("index"):
self.output_combo.current(i)
break
self.log("Gespeicherte Konfiguration geladen.")
except Exception as e:
self.log(f"Konfiguration konnte nicht geladen werden: {e}")
def apply_config_headless(config_path):
with open(config_path, "r", encoding="utf-8") as f:
cfg = json.load(f)
device_name = cfg["device_name"]
standard_name = cfg["video_standard"]["name"]
standard_value = int(cfg["video_standard"]["value"])
print(f"Öffne Videogerät: {device_name}")
video_filter = get_filter_by_name(CLSID_VideoInputDeviceCategory, device_name)
decoder = get_decoder(video_filter)
try:
before = decoder.get_TVFormat()
print(f"Vorheriger Videostandard: {before}")
except Exception:
pass
decoder.put_TVFormat(standard_value)
print(f"Videostandard gesetzt: {standard_name} ({standard_value})")
try:
after = decoder.get_TVFormat()
print(f"Aktueller Videostandard: {after}")
except Exception:
pass
crossbar_name = cfg.get("crossbar_name")
input_cfg = cfg.get("input")
output_cfg = cfg.get("output")
if not crossbar_name or not input_cfg or not output_cfg:
print("Keine Crossbar-/Eingangskonfiguration vorhanden.")
return
print(f"Öffne Crossbar: {crossbar_name}")
crossbar_filter = get_filter_by_name(CLSID_AM_KSCATEGORY_CROSSBAR, crossbar_name)
crossbar = get_crossbar(crossbar_filter)
output_index = int(output_cfg["index"])
input_index = int(input_cfg["index"])
crossbar.CanRoute(output_index, input_index)
crossbar.Route(output_index, input_index)
print(f"Eingang gesetzt: Output {output_index} -> Input {input_index} ({input_cfg.get('name', 'unbekannt')})")
def main():
parser = argparse.ArgumentParser(description="GUI für DirectShow Video Grabber Konfiguration")
parser.add_argument("mode", nargs="?", default="", help="/config öffnet die Konfigurationsoberfläche")
parser.add_argument("--config-file", default=CONFIG_FILE, help="Pfad zur config.json")
args = parser.parse_args()
force_config = args.mode.lower() in ("/config", "config", "--config")
ensure_config_dir()
config_exists = os.path.exists(args.config_file)
# Normalstart:
# - config.json vorhanden und kein /config: nur anwenden und beenden
# - keine config.json oder /config: GUI öffnen
if config_exists and not force_config:
try:
apply_config_headless(args.config_file)
return
except Exception as e:
print(f"Fehler beim Anwenden der Konfiguration: {e}")
print("Starte Konfigurationsoberfläche.")
app = VideoGrabberGui(args.config_file, force_config=True)
app.mainloop()
return
app = VideoGrabberGui(args.config_file, force_config=True)
app.mainloop()
if __name__ == "__main__":
main()