import tkinter as tk from tkinter import messagebox, ttk from datetime import datetime, timedelta import os import csv import logging import configparser from threading import Thread, Event from pydicom.dataset import Dataset from pynetdicom import AE, evt, debug_logger from pynetdicom.sop_class import StudyRootQueryRetrieveInformationModelFind import ast # === Constants and Logging Setup === LOG_FILE = "dicom_query.log" CONFIG_FILE = "config.txt" LOG_LEVELS = { "DEBUG": logging.DEBUG, "INFO": logging.INFO, "WARNING": logging.WARNING, "ERROR": logging.ERROR, "CRITICAL": logging.CRITICAL } logging.basicConfig( filename=LOG_FILE, level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) debug_logger() def load_all_profiles(): config = configparser.ConfigParser() config.read(CONFIG_FILE) return config def save_all_profiles(config): with open(CONFIG_FILE, 'w') as f: config.write(f) def generate_output_filename(): today = datetime.now().strftime("%Y%m%d_%H%M%S") return f"output_{today}.txt" def create_cfind_dataset(start_datetime, end_datetime): ds = Dataset() ds.QueryRetrieveLevel = 'STUDY' ds.StudyDate = start_datetime.strftime("%Y%m%d") ds.StudyTime = f"{start_datetime.strftime('%H%M%S')}-{end_datetime.strftime('%H%M%S')}" ds.PatientID = '' ds.PatientName = '' ds.AccessionNumber = '' ds.StudyInstanceUID = '' ds.StudyDescription = '' # Added StudyDescription ds.ModalitiesInStudy = '' ds.NumberOfStudyRelatedInstances = '' return ds class ConfigDialog: def __init__(self, parent): self.values = {} self.current_profile = tk.StringVar() self.log_level = tk.StringVar(value="INFO") self.status = tk.StringVar(value="Ready") self.record_count = tk.IntVar(value=0) self.pause_event = Event() self.stop_event = Event() self.pause_event.set() self.all_profiles = load_all_profiles() self.profile_names = self.all_profiles.sections() self.top = tk.Toplevel(parent) self.top.title("DICOM Query Configuration") self.top.protocol("WM_DELETE_WINDOW", self.on_cancel) tk.Label(self.top, text="Select Profile").grid(row=0, column=0, sticky=tk.W, padx=10, pady=5) self.profile_menu = ttk.Combobox(self.top, textvariable=self.current_profile, values=self.profile_names, state='readonly') self.profile_menu.grid(row=0, column=1, padx=10, pady=5) self.profile_menu.bind("<>", self.load_selected_profile) self.fields = ['RemoteAET', 'RemoteIP', 'RemotePort', 'LocalAET', 'StartDate'] self.entries = {} for i, field in enumerate(self.fields): tk.Label(self.top, text=field).grid(row=i+1, column=0, sticky=tk.W, padx=10, pady=5) entry = tk.Entry(self.top, width=30) entry.grid(row=i+1, column=1, padx=10, pady=5) self.entries[field] = entry tk.Label(self.top, text="Logging Level").grid(row=len(self.fields)+1, column=0, sticky=tk.W, padx=10, pady=5) log_level_menu = ttk.Combobox(self.top, textvariable=self.log_level, values=list(LOG_LEVELS.keys()), state='readonly') log_level_menu.grid(row=len(self.fields)+1, column=1, padx=10, pady=5) self.status_label = tk.Label(self.top, textvariable=self.status, fg="blue") self.status_label.grid(row=len(self.fields)+2, column=0, columnspan=2, padx=10, pady=5) self.record_label = tk.Label(self.top, text="Records Written: 0") self.record_label.grid(row=len(self.fields)+3, column=0, columnspan=2, padx=10, pady=5) button_frame = tk.Frame(self.top) button_frame.grid(row=len(self.fields)+4, column=0, columnspan=2, pady=10) tk.Button(button_frame, text="Run Query", command=self.on_run).pack(side=tk.LEFT, padx=5) tk.Button(button_frame, text="Pause", command=self.on_pause).pack(side=tk.LEFT, padx=5) tk.Button(button_frame, text="Stop", command=self.on_stop).pack(side=tk.LEFT, padx=5) tk.Button(button_frame, text="Cancel", command=self.on_cancel).pack(side=tk.RIGHT, padx=5) if self.profile_names: self.current_profile.set(self.profile_names[0]) self.load_selected_profile() self.top.grab_set() def load_selected_profile(self, event=None): profile = self.current_profile.get() for field in self.fields: value = self.all_profiles.get(profile, field, fallback='') self.entries[field].delete(0, tk.END) self.entries[field].insert(0, value) def on_run(self): profile = self.current_profile.get() if not profile: messagebox.showerror("Error", "Please select a profile.") return if profile not in self.all_profiles: self.all_profiles.add_section(profile) for field in self.fields: value = self.entries[field].get().strip() self.all_profiles.set(profile, field, value) self.values[field] = value save_all_profiles(self.all_profiles) logging.getLogger().setLevel(LOG_LEVELS[self.log_level.get()]) self.values['log_level'] = self.log_level.get() self.stop_event.clear() self.pause_event.set() thread = Thread(target=self.run_query) thread.start() def on_pause(self): if self.pause_event.is_set(): self.pause_event.clear() self.update_status("Paused") else: self.pause_event.set() self.update_status("Resumed") def on_stop(self): self.stop_event.set() self.update_status("Stopping...") def on_cancel(self): self.top.destroy() def update_status(self, message): self.top.after(0, self._set_status, message) def _set_status(self, message): self.status.set(message) def update_record_count(self): count = self.record_count.get() self.record_label.config(text=f"Records Written: {count}") def run_query(self): config = self.values self.update_status("Initializing DICOM query...") ae = AE(ae_title=config['LocalAET']) ae.add_requested_context(StudyRootQueryRetrieveInformationModelFind) handlers = [ (evt.EVT_ACCEPTED, lambda x: logging.info("Association accepted.")), (evt.EVT_REJECTED, lambda x: logging.warning("Association rejected.")), (evt.EVT_RELEASED, lambda x: logging.info("Association released.")), (evt.EVT_ABORTED, lambda x: logging.error("Association aborted.")), ] assoc = ae.associate( config['RemoteIP'], int(config['RemotePort']), ae_title=config['RemoteAET'], evt_handlers=handlers ) if not assoc.is_established: logging.error("DICOM Association failed.") self.update_status("Association failed.") messagebox.showerror("DICOM Error", "Association to PACS failed.") return start_date = datetime.strptime(config['StartDate'], '%Y-%m-%d') now = datetime.now() if start_date >= now: logging.error("Start date %s is in the future. Aborting query.", start_date) messagebox.showerror("Date Error", "Start date must be earlier than today.") return output_path = os.path.join(os.getcwd(), generate_output_filename()) logging.info("Starting C-FIND loop from %s to %s", start_date, now) with open(output_path, 'w', newline='', buffering=1) as f: writer = csv.writer(f, delimiter='|') writer.writerow([ 'PatientID', 'PatientName', 'AccessionNumber', 'StudyDate', 'StudyInstanceUID', 'StudyDescription', 'ModalitiesInStudy', 'NumberOfStudyRelatedInstances' ]) while start_date < now and not self.stop_event.is_set(): self.pause_event.wait() end_date = start_date + timedelta(hours=1) ds = create_cfind_dataset(start_date, end_date) self.update_status(f"Querying: {start_date.strftime('%Y-%m-%d %H:%M')}") try: responses = assoc.send_c_find(ds, StudyRootQueryRetrieveInformationModelFind) for status, identifier in responses: if self.stop_event.is_set(): break self.pause_event.wait() if status and status.Status in (0xFF00, 0xFF01) and identifier: mod_raw = getattr(identifier, 'ModalitiesInStudy', '') try: if isinstance(mod_raw, list): mod_list = mod_raw elif isinstance(mod_raw, str) and mod_raw.startswith('[') and mod_raw.endswith(']'): mod_list = ast.literal_eval(mod_raw) else: mod_list = [mod_raw] if mod_raw else [] mod_string = ", ".join(sorted(set(str(m).strip().upper() for m in mod_list if m))) except Exception: mod_string = str(mod_raw).strip() try: writer.writerow([ str(getattr(identifier, 'PatientID', '')).strip(), str(getattr(identifier, 'PatientName', '')).strip(), str(getattr(identifier, 'AccessionNumber', '')).strip(), str(getattr(identifier, 'StudyDate', '')).strip(), str(getattr(identifier, 'StudyInstanceUID', '')).strip(), str(getattr(identifier, 'StudyDescription', '')).strip(), # New field mod_string, str(getattr(identifier, 'NumberOfStudyRelatedInstances', '')).strip() ]) self.record_count.set(self.record_count.get() + 1) self.update_record_count() except Exception as e: logging.exception("Error writing to output file.") except Exception as e: logging.exception("Exception during C-FIND from %s to %s", start_date, end_date) start_date = end_date assoc.release() self.update_status("Query complete.") logging.info("C-FIND complete. Results saved to: %s", output_path) messagebox.showinfo("Success", f"C-FIND complete.\nResults saved to:\n{output_path}") self.top.destroy() # === Main === if __name__ == "__main__": root = tk.Tk() root.withdraw() ConfigDialog(root) root.mainloop()