diff --git a/paper3_phase5b_refined.py b/paper3_phase5b_refined.py index 21d8f66..23ea495 100644 --- a/paper3_phase5b_refined.py +++ b/paper3_phase5b_refined.py @@ -25,11 +25,20 @@ Usage: python paper3_phase5b_refined.py """ -import json, sys, math, time, random +import json, os, sys, math, time, random from collections import defaultdict -BQ_PROJECT = "goddard-gap" -DATA_PROJECT = "physionet-data" +# PostgreSQL connection string (libpq DSN). Override with env var. +# e.g. "host=localhost port=5432 dbname=mimic user=postgres password=..." +PG_DSN = os.environ.get("MIMIC_PG_DSN", "dbname=mimic3") +# Schema holding the stock MIMIC-III v1.3 tables (admissions, icustays, +# labevents, chartevents, inputevents_mv, inputevents_cv, outputevents, +# patients, d_items, ...). +MIMIC_SCHEMA = os.environ.get("MIMIC_SCHEMA", "mimiciii") +# Schema holding the locally built derived tables (sapsii, sepsis3, +# norepinephrine_dose, weight_durations, ...); see sql/schemas.sql. +# Defaults to the same schema as MIMIC-III itself. +DERIVED_SCHEMA = os.environ.get("DERIVED_SCHEMA", MIMIC_SCHEMA) H_SNAPSHOT = 24 H_PEAK_NE = 12 @@ -39,113 +48,143 @@ N_FOLDS = 5 N_BOOTSTRAP = 1000 OUT_FILE = "paper3_phase5b_refined.json" -NE_ITEMID = 221906 LACTATE_ID = 50813 -MAP_ITEMIDS = [220052, 220181, 225312] -HR_ITEMIDS = [220045] +# MAP: 52, 456, 6702 = CareVue; 220052, 220181, 225312 = MetaVision. +MAP_ITEMIDS = [52, 456, 6702, 220052, 220181, 225312] +# HR: 211 = CareVue; 220045 = MetaVision. +HR_ITEMIDS = [211, 220045] -def run_bq(sql, label=""): +_PG_CONN = None +def _pg_conn(): + global _PG_CONN + if _PG_CONN is None or getattr(_PG_CONN, "closed", 0): + import psycopg2 + _PG_CONN = psycopg2.connect(PG_DSN) + _PG_CONN.set_session(readonly=True, autocommit=True) + return _PG_CONN + + +def run_pg(sql, label=""): try: - from google.cloud import bigquery - client = bigquery.Client(project=BQ_PROJECT) + import psycopg2.extras + conn = _pg_conn() t0 = time.time() - rows = [dict(r.items()) for r in client.query(sql).result()] + with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: + cur.execute(sql) + rows = [dict(r) for r in cur.fetchall()] if cur.description else [] print(f" {label:32s} {len(rows):>8,d} rows ({time.time()-t0:.1f}s)") return rows except Exception as e: - print(f"[BQ ERROR] {label}: {e}"); return [] + print(f"[PG ERROR] {label}: {e}"); return [] -# ── Queries (same as Phase 5, SAPS Q4 pre-filtered) ──────────────────────── +# ── Queries (PostgreSQL / MIMIC-III v1.3, SAPS Q4 pre-filtered) ──────────── +# +# Notes on the port from BigQuery / MIMIC-IV: +# * `stay_id` (MIMIC-IV) is `icustay_id` in MIMIC-III; we alias to +# `stay_id` so the downstream Python is unchanged. +# * `mimiciv_3_1_icu.inputevents` (single table, mcg/kg/min) is split +# across `inputevents_mv` and `inputevents_cv` in MIMIC-III with +# different itemids and units. The `norepinephrine_dose` table built +# by sql/build_sepsis3.sql already merges both eras and normalises +# rates to mcg/kg/min, so we use that instead of the raw inputs. +# * Weight in MIMIC-IV is read from chartevents itemids 226512/224639 +# (MetaVision-only). In MIMIC-III those itemids cover only the MV +# half of the cohort, so we use the `weight_durations` table built by +# sql/build_sepsis3.sql (admit + daily + neonate + echo, both eras). +# * `pat.anchor_age` (MIMIC-IV) → computed from `pat.dob` against +# `icu.intime`. MIMIC-III shifts dob backwards by ~300 years for +# patients ≥89; we cap the result at 120. + def q_cohort(): return f""" WITH weight_first AS ( - SELECT ce.stay_id, ANY_VALUE(ce.valuenum) AS weight_kg - FROM `{DATA_PROJECT}.mimiciv_3_1_icu.chartevents` ce - JOIN `{DATA_PROJECT}.mimiciv_3_1_icu.icustays` icu ON ce.stay_id = icu.stay_id - JOIN `{DATA_PROJECT}.mimiciv_3_1_derived.sepsis3` s3 ON s3.stay_id = ce.stay_id - WHERE s3.sepsis3 = TRUE AND ce.itemid IN (226512, 224639) - AND ce.valuenum BETWEEN 30 AND 300 - AND ce.charttime BETWEEN icu.intime AND TIMESTAMP_ADD(icu.intime, INTERVAL 24 HOUR) - GROUP BY ce.stay_id + SELECT wd.icustay_id, MIN(wd.weight) AS weight_kg + FROM {DERIVED_SCHEMA}.weight_durations wd + JOIN {MIMIC_SCHEMA}.icustays icu ON icu.icustay_id = wd.icustay_id + WHERE wd.weight BETWEEN 30 AND 300 + AND wd.starttime <= icu.intime + INTERVAL '24 hours' + AND wd.endtime >= icu.intime + GROUP BY wd.icustay_id ) -SELECT icu.stay_id, icu.subject_id, icu.intime, - pat.anchor_age AS age, pat.gender, +SELECT icu.icustay_id AS stay_id, icu.subject_id, icu.intime, + LEAST(120.0, EXTRACT(EPOCH FROM (icu.intime - pat.dob)) / 31556952.0) AS age, + pat.gender, saps.sapsii, adm.hospital_expire_flag AS died, COALESCE(wf.weight_kg, 75.0) AS weight_kg -FROM `{DATA_PROJECT}.mimiciv_3_1_derived.sepsis3` s3 -JOIN `{DATA_PROJECT}.mimiciv_3_1_icu.icustays` icu ON icu.stay_id = s3.stay_id -JOIN `{DATA_PROJECT}.mimiciv_3_1_hosp.admissions` adm ON adm.hadm_id = icu.hadm_id -JOIN `{DATA_PROJECT}.mimiciv_3_1_hosp.patients` pat ON pat.subject_id = icu.subject_id -LEFT JOIN `{DATA_PROJECT}.mimiciv_3_1_derived.sapsii` saps ON saps.stay_id = icu.stay_id -LEFT JOIN weight_first wf ON wf.stay_id = icu.stay_id +FROM {DERIVED_SCHEMA}.sepsis3 s3 +JOIN {MIMIC_SCHEMA}.icustays icu ON icu.icustay_id = s3.icustay_id +JOIN {MIMIC_SCHEMA}.admissions adm ON adm.hadm_id = icu.hadm_id +JOIN {MIMIC_SCHEMA}.patients pat ON pat.subject_id = icu.subject_id +LEFT JOIN {DERIVED_SCHEMA}.sapsii saps ON saps.icustay_id = icu.icustay_id +LEFT JOIN weight_first wf ON wf.icustay_id = icu.icustay_id WHERE s3.sepsis3 = TRUE - AND TIMESTAMP_DIFF(icu.outtime, icu.intime, HOUR) >= {H_SNAPSHOT} + AND EXTRACT(EPOCH FROM (icu.outtime - icu.intime)) / 3600.0 >= {H_SNAPSHOT} AND saps.sapsii IS NOT NULL AND saps.sapsii >= 48 """ def q_ne(): return f""" -SELECT ie.stay_id, - TIMESTAMP_DIFF(ie.starttime, icu.intime, MINUTE) AS start_min, - TIMESTAMP_DIFF(ie.endtime, icu.intime, MINUTE) AS end_min, - ie.rate -FROM `{DATA_PROJECT}.mimiciv_3_1_icu.inputevents` ie -JOIN `{DATA_PROJECT}.mimiciv_3_1_icu.icustays` icu ON icu.stay_id = ie.stay_id -JOIN `{DATA_PROJECT}.mimiciv_3_1_derived.sepsis3` s3 ON s3.stay_id = ie.stay_id -JOIN `{DATA_PROJECT}.mimiciv_3_1_derived.sapsii` saps ON saps.stay_id = icu.stay_id +SELECT nd.icustay_id AS stay_id, + EXTRACT(EPOCH FROM (nd.starttime - icu.intime)) / 60.0 AS start_min, + EXTRACT(EPOCH FROM (nd.endtime - icu.intime)) / 60.0 AS end_min, + nd.vaso_rate AS rate +FROM {DERIVED_SCHEMA}.norepinephrine_dose nd +JOIN {MIMIC_SCHEMA}.icustays icu ON icu.icustay_id = nd.icustay_id +JOIN {DERIVED_SCHEMA}.sepsis3 s3 ON s3.icustay_id = nd.icustay_id +JOIN {DERIVED_SCHEMA}.sapsii saps ON saps.icustay_id = nd.icustay_id WHERE s3.sepsis3 = TRUE AND saps.sapsii >= 48 - AND ie.itemid = {NE_ITEMID} AND ie.rate > 0 - AND ie.starttime BETWEEN icu.intime AND TIMESTAMP_ADD(icu.intime, INTERVAL 30 HOUR) + AND nd.vaso_rate > 0 + AND nd.starttime BETWEEN icu.intime AND icu.intime + INTERVAL '30 hours' """ def q_fluid_out(): return f""" -SELECT oe.stay_id, SUM(oe.value) AS fluid_out_ml -FROM `{DATA_PROJECT}.mimiciv_3_1_icu.outputevents` oe -JOIN `{DATA_PROJECT}.mimiciv_3_1_icu.icustays` icu ON icu.stay_id = oe.stay_id -JOIN `{DATA_PROJECT}.mimiciv_3_1_derived.sepsis3` s3 ON s3.stay_id = oe.stay_id -JOIN `{DATA_PROJECT}.mimiciv_3_1_derived.sapsii` saps ON saps.stay_id = icu.stay_id +SELECT oe.icustay_id AS stay_id, SUM(oe.value) AS fluid_out_ml +FROM {MIMIC_SCHEMA}.outputevents oe +JOIN {MIMIC_SCHEMA}.icustays icu ON icu.icustay_id = oe.icustay_id +JOIN {DERIVED_SCHEMA}.sepsis3 s3 ON s3.icustay_id = oe.icustay_id +JOIN {DERIVED_SCHEMA}.sapsii saps ON saps.icustay_id = icu.icustay_id WHERE s3.sepsis3 = TRUE AND saps.sapsii >= 48 AND oe.value > 0 - AND oe.charttime BETWEEN icu.intime AND TIMESTAMP_ADD(icu.intime, INTERVAL {H_SNAPSHOT} HOUR) -GROUP BY oe.stay_id + AND oe.charttime BETWEEN icu.intime AND icu.intime + INTERVAL '{H_SNAPSHOT} hours' +GROUP BY oe.icustay_id """ def q_vitals(): ids = ",".join(str(x) for x in MAP_ITEMIDS + HR_ITEMIDS) return f""" -SELECT ce.stay_id, ce.itemid, AVG(ce.valuenum) AS val -FROM `{DATA_PROJECT}.mimiciv_3_1_icu.chartevents` ce -JOIN `{DATA_PROJECT}.mimiciv_3_1_icu.icustays` icu ON icu.stay_id = ce.stay_id -JOIN `{DATA_PROJECT}.mimiciv_3_1_derived.sepsis3` s3 ON s3.stay_id = ce.stay_id -JOIN `{DATA_PROJECT}.mimiciv_3_1_derived.sapsii` saps ON saps.stay_id = icu.stay_id +SELECT ce.icustay_id AS stay_id, ce.itemid, AVG(ce.valuenum) AS val +FROM {MIMIC_SCHEMA}.chartevents ce +JOIN {MIMIC_SCHEMA}.icustays icu ON icu.icustay_id = ce.icustay_id +JOIN {DERIVED_SCHEMA}.sepsis3 s3 ON s3.icustay_id = ce.icustay_id +JOIN {DERIVED_SCHEMA}.sapsii saps ON saps.icustay_id = icu.icustay_id WHERE s3.sepsis3 = TRUE AND saps.sapsii >= 48 AND ce.itemid IN ({ids}) AND ce.valuenum IS NOT NULL AND ce.valuenum > 0 - AND ce.charttime BETWEEN TIMESTAMP_ADD(icu.intime, INTERVAL 20 HOUR) - AND TIMESTAMP_ADD(icu.intime, INTERVAL 28 HOUR) -GROUP BY ce.stay_id, ce.itemid + AND ce.charttime BETWEEN icu.intime + INTERVAL '20 hours' + AND icu.intime + INTERVAL '28 hours' +GROUP BY ce.icustay_id, ce.itemid """ def q_lactate(): return f""" -SELECT icu.stay_id, - TIMESTAMP_DIFF(le.charttime, icu.intime, MINUTE) AS offset_min, +SELECT icu.icustay_id AS stay_id, + EXTRACT(EPOCH FROM (le.charttime - icu.intime)) / 60.0 AS offset_min, le.valuenum AS val -FROM `{DATA_PROJECT}.mimiciv_3_1_hosp.labevents` le -JOIN `{DATA_PROJECT}.mimiciv_3_1_icu.icustays` icu ON icu.hadm_id = le.hadm_id -JOIN `{DATA_PROJECT}.mimiciv_3_1_derived.sepsis3` s3 ON s3.stay_id = icu.stay_id -JOIN `{DATA_PROJECT}.mimiciv_3_1_derived.sapsii` saps ON saps.stay_id = icu.stay_id +FROM {MIMIC_SCHEMA}.labevents le +JOIN {MIMIC_SCHEMA}.icustays icu ON icu.hadm_id = le.hadm_id +JOIN {DERIVED_SCHEMA}.sepsis3 s3 ON s3.icustay_id = icu.icustay_id +JOIN {DERIVED_SCHEMA}.sapsii saps ON saps.icustay_id = icu.icustay_id WHERE s3.sepsis3 = TRUE AND saps.sapsii >= 48 AND le.itemid = {LACTATE_ID} AND le.valuenum IS NOT NULL - AND le.charttime BETWEEN icu.intime AND TIMESTAMP_ADD(icu.intime, INTERVAL 30 HOUR) + AND le.charttime BETWEEN icu.intime AND icu.intime + INTERVAL '30 hours' """ @@ -300,11 +339,11 @@ def main(): print("█"*78) print(f"\n[1] Fetching data...") - cohort_rows = run_bq(q_cohort(), "cohort") - ne_rows = run_bq(q_ne(), "NE events") - fout_rows = run_bq(q_fluid_out(), "Fluid out") - vital_rows = run_bq(q_vitals(), "Vitals h20-28") - lac_rows = run_bq(q_lactate(), "Lactate") + cohort_rows = run_pg(q_cohort(), "cohort") + ne_rows = run_pg(q_ne(), "NE events") + fout_rows = run_pg(q_fluid_out(), "Fluid out") + vital_rows = run_pg(q_vitals(), "Vitals h20-28") + lac_rows = run_pg(q_lactate(), "Lactate") cohort = {r["stay_id"]: dict(r) for r in cohort_rows} print(f"\n[2] Cohort: {len(cohort):,} SAPS Q4 sepsis-3")