Files
dicom_utilities/archive_discovery.py
2025-05-14 16:48:56 +00:00

226 lines
9.2 KiB
Python

import tkinter as tk
from tkinter import messagebox, ttk
from datetime import datetime, timedelta
import os
import csv
import logging
import configparser
from threading import Thread
from pydicom.dataset import Dataset
from pynetdicom import AE, evt, debug_logger
from pynetdicom.sop_class import StudyRootQueryRetrieveInformationModelFind
import ast
# === Constants and Logging Setup ===
LOG_FILE = "dicom_query.log"
CONFIG_FILE = "config.txt"
LOG_LEVELS = {"DEBUG": logging.DEBUG, "INFO": logging.INFO, "WARNING": logging.WARNING, "ERROR": logging.ERROR, "CRITICAL": logging.CRITICAL}
logging.basicConfig(
filename=LOG_FILE,
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
debug_logger()
def load_all_profiles():
config = configparser.ConfigParser()
config.read(CONFIG_FILE)
return config
def save_all_profiles(config):
with open(CONFIG_FILE, 'w') as f:
config.write(f)
def generate_output_filename():
today = datetime.now().strftime("%Y%m%d_%H%M%S")
return f"output_{today}.txt"
def create_cfind_dataset(start_datetime, end_datetime):
ds = Dataset()
ds.QueryRetrieveLevel = 'STUDY'
ds.StudyDate = start_datetime.strftime("%Y%m%d")
ds.StudyTime = f"{start_datetime.strftime('%H%M%S')}-{end_datetime.strftime('%H%M%S')}"
ds.PatientID = ''
ds.PatientName = ''
ds.AccessionNumber = ''
ds.StudyInstanceUID = ''
ds.ModalitiesInStudy = ''
ds.NumberOfStudyRelatedInstances = ''
return ds
class ConfigDialog:
def __init__(self, parent):
self.values = {}
self.current_profile = tk.StringVar()
self.log_level = tk.StringVar(value="INFO")
self.status = tk.StringVar(value="Ready")
self.all_profiles = load_all_profiles()
self.profile_names = self.all_profiles.sections()
self.top = tk.Toplevel(parent)
self.top.title("DICOM Query Configuration")
self.top.protocol("WM_DELETE_WINDOW", self.on_cancel)
tk.Label(self.top, text="Select Profile").grid(row=0, column=0, sticky=tk.W, padx=10, pady=5)
self.profile_menu = ttk.Combobox(self.top, textvariable=self.current_profile, values=self.profile_names, state='readonly')
self.profile_menu.grid(row=0, column=1, padx=10, pady=5)
self.profile_menu.bind("<<ComboboxSelected>>", self.load_selected_profile)
self.fields = ['RemoteAET', 'RemoteIP', 'RemotePort', 'LocalAET', 'StartDate']
self.entries = {}
for i, field in enumerate(self.fields):
tk.Label(self.top, text=field).grid(row=i+1, column=0, sticky=tk.W, padx=10, pady=5)
entry = tk.Entry(self.top, width=30)
entry.grid(row=i+1, column=1, padx=10, pady=5)
self.entries[field] = entry
tk.Label(self.top, text="Logging Level").grid(row=len(self.fields)+1, column=0, sticky=tk.W, padx=10, pady=5)
log_level_menu = ttk.Combobox(self.top, textvariable=self.log_level, values=list(LOG_LEVELS.keys()), state='readonly')
log_level_menu.grid(row=len(self.fields)+1, column=1, padx=10, pady=5)
self.status_label = tk.Label(self.top, textvariable=self.status, fg="blue")
self.status_label.grid(row=len(self.fields)+2, column=0, columnspan=2, padx=10, pady=5)
button_frame = tk.Frame(self.top)
button_frame.grid(row=len(self.fields)+3, column=0, columnspan=2, pady=10)
tk.Button(button_frame, text="Run Query", command=self.on_run).pack(side=tk.LEFT, padx=5)
tk.Button(button_frame, text="Cancel", command=self.on_cancel).pack(side=tk.RIGHT, padx=5)
if self.profile_names:
self.current_profile.set(self.profile_names[0])
self.load_selected_profile()
self.top.grab_set()
def load_selected_profile(self, event=None):
profile = self.current_profile.get()
for field in self.fields:
value = self.all_profiles.get(profile, field, fallback='')
self.entries[field].delete(0, tk.END)
self.entries[field].insert(0, value)
def on_run(self):
profile = self.current_profile.get()
if not profile:
messagebox.showerror("Error", "Please select a profile.")
return
if profile not in self.all_profiles:
self.all_profiles.add_section(profile)
for field in self.fields:
value = self.entries[field].get().strip()
self.all_profiles.set(profile, field, value)
self.values[field] = value
save_all_profiles(self.all_profiles)
logging.getLogger().setLevel(LOG_LEVELS[self.log_level.get()])
self.values['log_level'] = self.log_level.get()
thread = Thread(target=self.run_query)
thread.start()
def on_cancel(self):
self.top.destroy()
def update_status(self, message):
self.top.after(0, self._set_status, message)
def _set_status(self, message):
self.status.set(message)
def run_query(self):
config = self.values
self.update_status("Initializing DICOM query...")
ae = AE(ae_title=config['LocalAET'])
ae.add_requested_context(StudyRootQueryRetrieveInformationModelFind)
handlers = [
(evt.EVT_ACCEPTED, lambda x: logging.info("Association accepted.")),
(evt.EVT_REJECTED, lambda x: logging.warning("Association rejected.")),
(evt.EVT_RELEASED, lambda x: logging.info("Association released.")),
(evt.EVT_ABORTED, lambda x: logging.error("Association aborted.")),
]
assoc = ae.associate(
config['RemoteIP'],
int(config['RemotePort']),
ae_title=config['RemoteAET'],
evt_handlers=handlers
)
if not assoc.is_established:
logging.error("DICOM Association failed.")
self.update_status("Association failed.")
messagebox.showerror("DICOM Error", "Association to PACS failed.")
return
start_date = datetime.strptime(config['StartDate'], '%Y-%m-%d')
now = datetime.now()
if start_date >= now:
logging.error("Start date %s is in the future. Aborting query.", start_date)
messagebox.showerror("Date Error", "Start date must be earlier than today.")
return
output_path = os.path.join(os.getcwd(), generate_output_filename())
logging.info("Starting C-FIND loop from %s to %s", start_date, now)
with open(output_path, 'w', newline='') as f:
writer = csv.writer(f, delimiter='|')
writer.writerow([
'PatientID', 'PatientName', 'AccessionNumber', 'StudyDate',
'StudyInstanceUID', 'ModalitiesInStudy', 'NumberOfStudyRelatedInstances'
])
while start_date < now:
end_date = start_date + timedelta(hours=1)
ds = create_cfind_dataset(start_date, end_date)
self.update_status(f"Querying: {start_date.strftime('%Y-%m-%d %H:%M')}")
try:
responses = assoc.send_c_find(ds, StudyRootQueryRetrieveInformationModelFind)
for status, identifier in responses:
if status and status.Status in (0xFF00, 0xFF01) and identifier:
mod_raw = getattr(identifier, 'ModalitiesInStudy', '')
try:
if isinstance(mod_raw, list):
mod_list = mod_raw
elif isinstance(mod_raw, str) and mod_raw.startswith('[') and mod_raw.endswith(']'):
mod_list = ast.literal_eval(mod_raw)
else:
mod_list = [mod_raw] if mod_raw else []
mod_string = ", ".join(sorted(set(str(m).strip().upper() for m in mod_list if m)))
except Exception:
mod_string = str(mod_raw).strip()
writer.writerow([
str(getattr(identifier, 'PatientID', '')).strip(),
str(getattr(identifier, 'PatientName', '')).strip(),
str(getattr(identifier, 'AccessionNumber', '')).strip(),
str(getattr(identifier, 'StudyDate', '')).strip(),
str(getattr(identifier, 'StudyInstanceUID', '')).strip(),
mod_string,
str(getattr(identifier, 'NumberOfStudyRelatedInstances', '')).strip()
])
elif status:
logging.debug("C-FIND response: 0x%04X", status.Status)
except Exception as e:
logging.exception("Exception during C-FIND from %s to %s", start_date, end_date)
start_date = end_date
assoc.release()
self.update_status("Query complete.")
logging.info("C-FIND complete. Results saved to: %s", output_path)
messagebox.showinfo("Success", f"C-FIND complete.\nResults saved to:\n{output_path}")
self.top.destroy()
# === Main ===
if __name__ == "__main__":
root = tk.Tk()
root.withdraw()
ConfigDialog(root)
root.mainloop()