236 lines
9.5 KiB
Python
236 lines
9.5 KiB
Python
import tkinter as tk
|
|
from tkinter import messagebox, ttk
|
|
from datetime import datetime, timedelta
|
|
import os
|
|
import csv
|
|
import logging
|
|
import configparser
|
|
from threading import Thread
|
|
from pydicom.dataset import Dataset
|
|
from pynetdicom import AE, evt, debug_logger
|
|
from pynetdicom.sop_class import StudyRootQueryRetrieveInformationModelFind
|
|
import ast
|
|
|
|
# === Constants and Logging Setup ===
|
|
LOG_FILE = "dicom_query.log"
|
|
CONFIG_FILE = "config.txt"
|
|
LOG_LEVELS = {
|
|
"DEBUG": logging.DEBUG,
|
|
"INFO": logging.INFO,
|
|
"WARNING": logging.WARNING,
|
|
"ERROR": logging.ERROR,
|
|
"CRITICAL": logging.CRITICAL
|
|
}
|
|
|
|
logging.basicConfig(
|
|
filename=LOG_FILE,
|
|
level=logging.INFO,
|
|
format='%(asctime)s [%(levelname)s] %(message)s',
|
|
datefmt='%Y-%m-%d %H:%M:%S'
|
|
)
|
|
debug_logger()
|
|
|
|
def load_all_profiles():
|
|
config = configparser.ConfigParser()
|
|
config.read(CONFIG_FILE)
|
|
return config
|
|
|
|
def save_all_profiles(config):
|
|
with open(CONFIG_FILE, 'w') as f:
|
|
config.write(f)
|
|
|
|
def generate_output_filename():
|
|
today = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
return f"output_{today}.txt"
|
|
|
|
def create_cfind_dataset(start_datetime, end_datetime):
|
|
ds = Dataset()
|
|
ds.QueryRetrieveLevel = 'STUDY'
|
|
ds.StudyDate = start_datetime.strftime("%Y%m%d")
|
|
ds.StudyTime = f"{start_datetime.strftime('%H%M%S')}-{end_datetime.strftime('%H%M%S')}"
|
|
ds.PatientID = ''
|
|
ds.PatientName = ''
|
|
ds.AccessionNumber = ''
|
|
ds.StudyInstanceUID = ''
|
|
ds.ModalitiesInStudy = ''
|
|
ds.NumberOfStudyRelatedInstances = ''
|
|
return ds
|
|
|
|
class ConfigDialog:
|
|
def __init__(self, parent):
|
|
self.values = {}
|
|
self.current_profile = tk.StringVar()
|
|
self.log_level = tk.StringVar(value="INFO")
|
|
self.status = tk.StringVar(value="Ready")
|
|
self.all_profiles = load_all_profiles()
|
|
self.profile_names = self.all_profiles.sections()
|
|
|
|
self.top = tk.Toplevel(parent)
|
|
self.top.title("DICOM Query Configuration")
|
|
self.top.protocol("WM_DELETE_WINDOW", self.on_cancel)
|
|
|
|
tk.Label(self.top, text="Select Profile").grid(row=0, column=0, sticky=tk.W, padx=10, pady=5)
|
|
self.profile_menu = ttk.Combobox(self.top, textvariable=self.current_profile, values=self.profile_names, state='readonly')
|
|
self.profile_menu.grid(row=0, column=1, padx=10, pady=5)
|
|
self.profile_menu.bind("<<ComboboxSelected>>", self.load_selected_profile)
|
|
|
|
self.fields = ['RemoteAET', 'RemoteIP', 'RemotePort', 'LocalAET', 'StartDate']
|
|
self.entries = {}
|
|
for i, field in enumerate(self.fields):
|
|
tk.Label(self.top, text=field).grid(row=i+1, column=0, sticky=tk.W, padx=10, pady=5)
|
|
entry = tk.Entry(self.top, width=30)
|
|
entry.grid(row=i+1, column=1, padx=10, pady=5)
|
|
self.entries[field] = entry
|
|
|
|
tk.Label(self.top, text="Logging Level").grid(row=len(self.fields)+1, column=0, sticky=tk.W, padx=10, pady=5)
|
|
log_level_menu = ttk.Combobox(self.top, textvariable=self.log_level, values=list(LOG_LEVELS.keys()), state='readonly')
|
|
log_level_menu.grid(row=len(self.fields)+1, column=1, padx=10, pady=5)
|
|
|
|
self.status_label = tk.Label(self.top, textvariable=self.status, fg="blue")
|
|
self.status_label.grid(row=len(self.fields)+2, column=0, columnspan=2, padx=10, pady=5)
|
|
|
|
button_frame = tk.Frame(self.top)
|
|
button_frame.grid(row=len(self.fields)+3, column=0, columnspan=2, pady=10)
|
|
tk.Button(button_frame, text="Run Query", command=self.on_run).pack(side=tk.LEFT, padx=5)
|
|
tk.Button(button_frame, text="Cancel", command=self.on_cancel).pack(side=tk.RIGHT, padx=5)
|
|
|
|
if self.profile_names:
|
|
self.current_profile.set(self.profile_names[0])
|
|
self.load_selected_profile()
|
|
|
|
self.top.grab_set()
|
|
|
|
def load_selected_profile(self, event=None):
|
|
profile = self.current_profile.get()
|
|
for field in self.fields:
|
|
value = self.all_profiles.get(profile, field, fallback='')
|
|
self.entries[field].delete(0, tk.END)
|
|
self.entries[field].insert(0, value)
|
|
|
|
def on_run(self):
|
|
profile = self.current_profile.get()
|
|
if not profile:
|
|
messagebox.showerror("Error", "Please select a profile.")
|
|
return
|
|
if profile not in self.all_profiles:
|
|
self.all_profiles.add_section(profile)
|
|
|
|
for field in self.fields:
|
|
value = self.entries[field].get().strip()
|
|
self.all_profiles.set(profile, field, value)
|
|
self.values[field] = value
|
|
|
|
save_all_profiles(self.all_profiles)
|
|
logging.getLogger().setLevel(LOG_LEVELS[self.log_level.get()])
|
|
self.values['log_level'] = self.log_level.get()
|
|
|
|
thread = Thread(target=self.run_query)
|
|
thread.start()
|
|
|
|
def on_cancel(self):
|
|
self.top.destroy()
|
|
|
|
def update_status(self, message):
|
|
self.top.after(0, self._set_status, message)
|
|
|
|
def _set_status(self, message):
|
|
self.status.set(message)
|
|
|
|
def run_query(self):
|
|
config = self.values
|
|
self.update_status("Initializing DICOM query...")
|
|
ae = AE(ae_title=config['LocalAET'])
|
|
ae.add_requested_context(StudyRootQueryRetrieveInformationModelFind)
|
|
|
|
handlers = [
|
|
(evt.EVT_ACCEPTED, lambda x: logging.info("Association accepted.")),
|
|
(evt.EVT_REJECTED, lambda x: logging.warning("Association rejected.")),
|
|
(evt.EVT_RELEASED, lambda x: logging.info("Association released.")),
|
|
(evt.EVT_ABORTED, lambda x: logging.error("Association aborted.")),
|
|
]
|
|
|
|
assoc = ae.associate(
|
|
config['RemoteIP'],
|
|
int(config['RemotePort']),
|
|
ae_title=config['RemoteAET'],
|
|
evt_handlers=handlers
|
|
)
|
|
|
|
if not assoc.is_established:
|
|
logging.error("DICOM Association failed.")
|
|
self.update_status("Association failed.")
|
|
messagebox.showerror("DICOM Error", "Association to PACS failed.")
|
|
return
|
|
|
|
start_date = datetime.strptime(config['StartDate'], '%Y-%m-%d')
|
|
now = datetime.now()
|
|
if start_date >= now:
|
|
logging.error("Start date %s is in the future. Aborting query.", start_date)
|
|
messagebox.showerror("Date Error", "Start date must be earlier than today.")
|
|
return
|
|
|
|
output_path = os.path.join(os.getcwd(), generate_output_filename())
|
|
logging.info("Starting C-FIND loop from %s to %s", start_date, now)
|
|
|
|
with open(output_path, 'w', newline='') as f:
|
|
writer = csv.writer(f, delimiter='|')
|
|
writer.writerow([
|
|
'PatientID', 'PatientName', 'AccessionNumber', 'StudyDate',
|
|
'StudyInstanceUID', 'ModalitiesInStudy', 'NumberOfStudyRelatedInstances'
|
|
])
|
|
|
|
while start_date < now:
|
|
end_date = start_date + timedelta(hours=1)
|
|
ds = create_cfind_dataset(start_date, end_date)
|
|
self.update_status(f"Querying: {start_date.strftime('%Y-%m-%d %H:%M')}")
|
|
|
|
try:
|
|
responses = assoc.send_c_find(ds, StudyRootQueryRetrieveInformationModelFind)
|
|
for status, identifier in responses:
|
|
if status and status.Status in (0xFF00, 0xFF01) and identifier:
|
|
mod_raw = getattr(identifier, 'ModalitiesInStudy', '')
|
|
try:
|
|
if isinstance(mod_raw, list):
|
|
mod_list = mod_raw
|
|
elif isinstance(mod_raw, str) and mod_raw.startswith('[') and mod_raw.endswith(']'):
|
|
mod_list = ast.literal_eval(mod_raw)
|
|
else:
|
|
mod_list = [mod_raw] if mod_raw else []
|
|
|
|
mod_string = ", ".join(sorted(set(str(m).strip().upper() for m in mod_list if m)))
|
|
except Exception:
|
|
mod_string = str(mod_raw).strip()
|
|
|
|
try:
|
|
writer.writerow([
|
|
str(getattr(identifier, 'PatientID', '')).strip(),
|
|
str(getattr(identifier, 'PatientName', '')).strip(),
|
|
str(getattr(identifier, 'AccessionNumber', '')).strip(),
|
|
str(getattr(identifier, 'StudyDate', '')).strip(),
|
|
str(getattr(identifier, 'StudyInstanceUID', '')).strip(),
|
|
mod_string,
|
|
str(getattr(identifier, 'NumberOfStudyRelatedInstances', '')).strip()
|
|
])
|
|
f.flush()
|
|
except Exception as e:
|
|
logging.exception("Error writing to output file.")
|
|
elif status:
|
|
logging.debug("C-FIND response: 0x%04X", status.Status)
|
|
except Exception as e:
|
|
logging.exception("Exception during C-FIND from %s to %s", start_date, end_date)
|
|
|
|
start_date = end_date
|
|
|
|
assoc.release()
|
|
self.update_status("Query complete.")
|
|
logging.info("C-FIND complete. Results saved to: %s", output_path)
|
|
messagebox.showinfo("Success", f"C-FIND complete.\nResults saved to:\n{output_path}")
|
|
self.top.destroy()
|
|
|
|
# === Main ===
|
|
if __name__ == "__main__":
|
|
root = tk.Tk()
|
|
root.withdraw()
|
|
ConfigDialog(root)
|
|
root.mainloop()
|