#Building an AI Clinical Documentation Pipeline
I built a serverless pipeline that transforms clinical conversations into clinical documentation using AWS's AI services, with a focus on creating queryable, analytics-ready data in Apache Iceberg format.
#The Challenge
Care managers record conversations that need to be converted into formal SOAP notes (Subjective, Objective, Assessment, Plan). The challenge isn't just transcription - it's creating an automated data pipeline that:
- Accurately captures medical terminology and speaker context
- Extracts standardized medical codes (ICD-10, RxNorm, SNOMED CT)
- Stores the data in a format optimized for healthcare analytics
- Maintains HIPAA compliance throughout the workflow
#Architecture
#Core Components
#1. Medical Transcription
I configured Amazon Transcribe Medical specifically for clinical conversations:
response = transcribe.start_medical_transcription_job(
MedicalTranscriptionJobName=job_name,
LanguageCode="en-US", # Only valid option for medical transcription
Media={"MediaFileUri": str(media_uri)},
OutputBucketName=bucket,
Settings={
"ShowSpeakerLabels": True,
"MaxSpeakerLabels": 2,
},
Specialty="PRIMARYCARE", # Required for medical transcription
Type="CONVERSATION" # Specifies doctor-patient dialogue
)
This setup captures both the medical terminology and the conversation dynamics between care manager and patient.
#2. Parallel SOAP Note Processing
The system processes SOAP sections independently to enable future specialization:
def create_generate_soap_note_task(self) -> sfn.Chain:
section_generators = []
for section in ["subjective", "objective", "assessment", "plan"]:
generate_section = get_anthropic_claude_invoke_chain(
self,
f"Generate{section.capitalize()}Section",
prompt=section_specific_prompt,
max_tokens_to_sample=2000,
temperature=0
)
section_generators.append(generate_section)
#3. Medical Entity Extraction
#Processing the Medical Text
First, we need to break our text into manageable chunks since AWS Comprehend Medical has a 5000-character limit:
def process_medical_text(text: str) -> List[str]:
# AWS Comprehend Medical has a 5000-character limit
MAX_CHUNK_SIZE = 5000
return [text[i:i+MAX_CHUNK_SIZE] for i in range(0, len(text), MAX_CHUNK_SIZE)]
#Three Types of Detection
#1. General Medical Entity Detection
Our first pass identifies medical terminology and concepts. This includes things like conditions, medications, and anatomical references:
def detect_medical_entities(text: str) -> List[Dict[str, Any]]:
response = comprehend_medical.detect_entities_v2(Text=text)
# We only keep entities with high confidence scores
return [
entity for entity in response["Entities"]
if entity["Score"] >= 0.8 # 80% confidence threshold
]
This gives us results like:
{
"Text": "blood pressure",
"Category": "TEST_TREATMENT_PROCEDURE",
"Type": "TEST_NAME",
"Score": 0.97,
"Traits": [
{"Name": "VITAL_SIGN", "Score": 0.92}
]
}
#2. ICD-10 Code Detection
Next, we look specifically for diagnoses that map to ICD-10 codes:
def extract_icd10_codes(text: str) -> List[Dict[str, Any]]:
response = comprehend_medical.infer_icd10_cm(Text=text)
# Extract both the diagnosis and its ICD-10 code
codes = []
for entity in response["Entities"]:
if entity["Score"] >= 0.8:
codes.append({
"Text": entity["Text"], # e.g., "type 2 diabetes"
"ICD10": entity["ICD10CMConcepts"][0]["Code"], # e.g., "E11.9"
"Score": entity["Score"]
})
return codes
#3. Medication Code Detection
Finally, we extract medication information and map it to RxNorm codes:
def extract_medications(text: str) -> List[Dict[str, Any]]:
response = comprehend_medical.infer_rx_norm(Text=text)
medications = []
for med in response["Entities"]:
if med["Score"] >= 0.8:
medications.append({
"Text": med["Text"], # e.g., "metformin 500mg"
"RxNorm": med["RxNormConcepts"][0]["Code"], # e.g., "203563"
"Attributes": { # Extra info like dosage, frequency
attr["Type"]: attr["Text"]
for attr in med.get("Attributes", [])
if attr["Score"] >= 0.8
}
})
return medications
#Handling the SOAP Note Sections
We process each section of the SOAP note through all three detectors:
def process_soap_sections(soap_note: Dict[str, str]) -> Dict[str, Dict[str, List]]:
processed_sections = {}
for section_name, text in soap_note.items():
section_results = {
"entities": [],
"diagnoses": [],
"medications": []
}
# Process each chunk of the section text
for chunk in process_medical_text(text):
# Run all three detectors
section_results["entities"].extend(detect_medical_entities(chunk))
section_results["diagnoses"].extend(extract_icd10_codes(chunk))
section_results["medications"].extend(extract_medications(chunk))
processed_sections[section_name] = section_results
return processed_sections
#Deduplication and Confidence Filtering
Before finalizing our results, we remove duplicates while keeping the highest confidence scores:
def deduplicate_entities(entities: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
unique_entities = {}
for entity in entities:
entity_key = (entity["Text"].lower(), entity.get("Category", "UNKNOWN"))
if (entity_key not in unique_entities or
entity["Score"] > unique_entities[entity_key]["Score"]):
unique_entities[entity_key] = entity
return list(unique_entities.values())
#Example Output
For a SOAP note section like:
Patient reports increased blood pressure. Currently taking metformin 500mg twice daily
for type 2 diabetes. Complains of occasional chest pain.
We extract structured data like:
{
"subjective": {
"entities": [
{"Text": "blood pressure", "Category": "TEST_TREATMENT_PROCEDURE"},
{"Text": "chest pain", "Category": "MEDICAL_CONDITION"}
],
"diagnoses": [
{
"Text": "type 2 diabetes",
"ICD10": "E11.9",
"Score": 0.95
}
],
"medications": [
{
"Text": "metformin 500mg twice daily",
"RxNorm": "203563",
"Attributes": {
"DOSAGE": "500mg",
"FREQUENCY": "twice daily"
}
}
]
}
}
#Error Handling
We wrap each API call with error handling to ensure the pipeline continues even if one detection fails:
def safe_api_call(func: Callable, text: str) -> List[Dict[str, Any]]:
try:
return func(text)
except ClientError as e:
logger.error(f"API call failed: {str(e)}")
return [] # Return empty list on failure
#Preparing for Storage
Finally, we format the extracted data for our Iceberg tables:
def prepare_for_storage(processed_sections: Dict[str, Dict[str, List]]) -> Dict[str, Any]:
return {
"entities": json.dumps([
entity
for section in processed_sections.values()
for entity in section["entities"]
]),
"medical_codes": json.dumps({
"diagnoses": [
dx
for section in processed_sections.values()
for dx in section["diagnoses"]
],
"medications": [
med
for section in processed_sections.values()
for med in section["medications"]
]
})
}
This structured data flows into our Iceberg tables, where we can use Athena's JSON functions to analyze:
- Diagnosis patterns across patient populations
- Medication prescribing trends
- Common symptoms and conditions
- Treatment effectiveness over time
#4. Data Storage with Apache Iceberg
A final Lambda function converts complex clinical data into queryable Iceberg tables using the AWS SDK for pandas (awswrangler), a library that integrates pandas functionality to AWS services. The library is easily added via a Lambda layer.
def write_to_iceberg(record: Dict[str, Any], bucket: str, database: str, table: str):
df = pd.DataFrame([record])
# Store complex objects as queryable JSON
df["soap_note"] = df["soap_note"].apply(json.dumps)
df["medical_codes"] = df["medical_codes"].apply(json.dumps)
# Define schema for clinical data
dtype = {
"external_call_id": "string",
"processed_at": "timestamp",
"soap_note": "string", # JSON containing SOAP sections
"medical_codes": "string", # JSON containing extracted codes
}
# Configure Iceberg table with optimizations
wr.athena.to_iceberg(
df=df,
database=database,
table=table,
schema_evolution=True,
encryption="SSE_S3",
additional_table_properties={
"write_target_data_file_size_bytes": "536870912",
"format-version": "2",
}
)
The pipeline stores SOAP notes and medical codes as JSON strings within Iceberg tables, enabling complex queries through Athena's JSON functions. This approach provides several benefits:
- Schema Flexibility: New fields can be added to the SOAP notes or medical codes without requiring table alterations
- Optimized Storage: Large clinical text is compressed and stored efficiently
- SQL Accessibility: Complex nested data becomes queryable through standard SQL:
SELECT
external_call_id,
processed_at,
-- Extract specific SOAP sections
JSON_EXTRACT_SCALAR(soap_note, '$.subjective') as subjective,
JSON_EXTRACT_SCALAR(soap_note, '$.objective') as objective,
-- Extract and analyze medical codes
JSON_EXTRACT(medical_codes, '$.icd10_codes') as diagnoses,
JSON_EXTRACT(medical_codes, '$.rx_codes') as medications
FROM clinical_data.soap_notes
WHERE processed_at > CURRENT_DATE - INTERVAL '30' DAY;
This setup enables downstream analytics while maintaining the original document structure. The Iceberg format also provides ACID transactions and time travel capabilities, facilitating data governance.
#Future Work
I'm currently exploring:
-
Analytics Enhancements
- Creating standardized clinical metrics views
- Building trend analysis pipelines
- Implementing quality scoring for SOAP notes
-
Model Improvements
- Testing specialized medical models for each SOAP section
- Adding automated quality checks for generated notes
- Expanding medical code extraction coverage
-
Data Pipeline Optimization
- Implementing data partitioning strategies
- Adding data quality validation steps
- Building automated testing for the extraction pipeline