Shawn Albert

Menu

Close

Building an AI Clinical Documentation Pipeline

Shawn AlbertยทMon Nov 04 2024

Serverless AI pipeline that transforms patient conversations into clinical documentation using AWS's AI services for automated medical transcription, SOAP note generation, and medical entity extraction.

Back

#Building an AI Clinical Documentation Pipeline

I built a serverless pipeline that transforms clinical conversations into clinical documentation using AWS's AI services, with a focus on creating queryable, analytics-ready data in Apache Iceberg format.

#The Challenge

Care managers record conversations that need to be converted into formal SOAP notes (Subjective, Objective, Assessment, Plan). The challenge isn't just transcription - it's creating an automated data pipeline that:

  • Accurately captures medical terminology and speaker context
  • Extracts standardized medical codes (ICD-10, RxNorm, SNOMED CT)
  • Stores the data in a format optimized for healthcare analytics
  • Maintains HIPAA compliance throughout the workflow

#Architecture

AWS Cloud
Amazon
CloudWatch
Amazon Simple
Notification Service
AWS Chatbot
Slack Channel
CloudWatch Alarm to
notify of SNS failure
Pipeline Success or Failure Notification
Care Managerย Phone Callย 
Audio File (MP3)
AWS Step Functions workflow
Transcribed Audio
Amazon
Transcribe Medical
Processed Files
Clinical SOAP Notes
Amazon
Bedrock
Extracted Medical Entities
Amazon
Comprehend Medical
Append Outputs to
Apache Iceberg Table
Amazon
Athena
Landing S3 Bucket

#Core Components

#1. Medical Transcription

I configured Amazon Transcribe Medical specifically for clinical conversations:

response = transcribe.start_medical_transcription_job(
    MedicalTranscriptionJobName=job_name,
    LanguageCode="en-US",  # Only valid option for medical transcription
    Media={"MediaFileUri": str(media_uri)},
    OutputBucketName=bucket,
    Settings={
        "ShowSpeakerLabels": True,
        "MaxSpeakerLabels": 2,
    },
    Specialty="PRIMARYCARE",  # Required for medical transcription
    Type="CONVERSATION"  # Specifies doctor-patient dialogue
)

This setup captures both the medical terminology and the conversation dynamics between care manager and patient.

#2. Parallel SOAP Note Processing

The system processes SOAP sections independently to enable future specialization:

def create_generate_soap_note_task(self) -> sfn.Chain:
    section_generators = []
    
    for section in ["subjective", "objective", "assessment", "plan"]:
        generate_section = get_anthropic_claude_invoke_chain(
            self,
            f"Generate{section.capitalize()}Section",
            prompt=section_specific_prompt,
            max_tokens_to_sample=2000,
            temperature=0
        )
        section_generators.append(generate_section)

#3. Medical Entity Extraction

#Processing the Medical Text

First, we need to break our text into manageable chunks since AWS Comprehend Medical has a 5000-character limit:

def process_medical_text(text: str) -> List[str]:
    # AWS Comprehend Medical has a 5000-character limit
    MAX_CHUNK_SIZE = 5000
    return [text[i:i+MAX_CHUNK_SIZE] for i in range(0, len(text), MAX_CHUNK_SIZE)]

#Three Types of Detection

#1. General Medical Entity Detection

Our first pass identifies medical terminology and concepts. This includes things like conditions, medications, and anatomical references:

def detect_medical_entities(text: str) -> List[Dict[str, Any]]:
    response = comprehend_medical.detect_entities_v2(Text=text)
    
    # We only keep entities with high confidence scores
    return [
        entity for entity in response["Entities"]
        if entity["Score"] >= 0.8  # 80% confidence threshold
    ]

This gives us results like:

{
    "Text": "blood pressure",
    "Category": "TEST_TREATMENT_PROCEDURE",
    "Type": "TEST_NAME",
    "Score": 0.97,
    "Traits": [
        {"Name": "VITAL_SIGN", "Score": 0.92}
    ]
}
#2. ICD-10 Code Detection

Next, we look specifically for diagnoses that map to ICD-10 codes:

def extract_icd10_codes(text: str) -> List[Dict[str, Any]]:
    response = comprehend_medical.infer_icd10_cm(Text=text)
    
    # Extract both the diagnosis and its ICD-10 code
    codes = []
    for entity in response["Entities"]:
        if entity["Score"] >= 0.8:
            codes.append({
                "Text": entity["Text"],  # e.g., "type 2 diabetes"
                "ICD10": entity["ICD10CMConcepts"][0]["Code"],  # e.g., "E11.9"
                "Score": entity["Score"]
            })
    return codes
#3. Medication Code Detection

Finally, we extract medication information and map it to RxNorm codes:

def extract_medications(text: str) -> List[Dict[str, Any]]:
    response = comprehend_medical.infer_rx_norm(Text=text)
    
    medications = []
    for med in response["Entities"]:
        if med["Score"] >= 0.8:
            medications.append({
                "Text": med["Text"],  # e.g., "metformin 500mg"
                "RxNorm": med["RxNormConcepts"][0]["Code"],  # e.g., "203563"
                "Attributes": {  # Extra info like dosage, frequency
                    attr["Type"]: attr["Text"]
                    for attr in med.get("Attributes", [])
                    if attr["Score"] >= 0.8
                }
            })
    return medications

#Handling the SOAP Note Sections

We process each section of the SOAP note through all three detectors:

def process_soap_sections(soap_note: Dict[str, str]) -> Dict[str, Dict[str, List]]:
    processed_sections = {}
    
    for section_name, text in soap_note.items():
        section_results = {
            "entities": [],
            "diagnoses": [],
            "medications": []
        }
        
        # Process each chunk of the section text
        for chunk in process_medical_text(text):
            # Run all three detectors
            section_results["entities"].extend(detect_medical_entities(chunk))
            section_results["diagnoses"].extend(extract_icd10_codes(chunk))
            section_results["medications"].extend(extract_medications(chunk))
            
        processed_sections[section_name] = section_results
        
    return processed_sections

#Deduplication and Confidence Filtering

Before finalizing our results, we remove duplicates while keeping the highest confidence scores:

def deduplicate_entities(entities: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
    unique_entities = {}
    
    for entity in entities:
        entity_key = (entity["Text"].lower(), entity.get("Category", "UNKNOWN"))
        
        if (entity_key not in unique_entities or 
            entity["Score"] > unique_entities[entity_key]["Score"]):
            unique_entities[entity_key] = entity
            
    return list(unique_entities.values())

#Example Output

For a SOAP note section like:

Patient reports increased blood pressure. Currently taking metformin 500mg twice daily 
for type 2 diabetes. Complains of occasional chest pain.

We extract structured data like:

{
    "subjective": {
        "entities": [
            {"Text": "blood pressure", "Category": "TEST_TREATMENT_PROCEDURE"},
            {"Text": "chest pain", "Category": "MEDICAL_CONDITION"}
        ],
        "diagnoses": [
            {
                "Text": "type 2 diabetes",
                "ICD10": "E11.9",
                "Score": 0.95
            }
        ],
        "medications": [
            {
                "Text": "metformin 500mg twice daily",
                "RxNorm": "203563",
                "Attributes": {
                    "DOSAGE": "500mg",
                    "FREQUENCY": "twice daily"
                }
            }
        ]
    }
}

#Error Handling

We wrap each API call with error handling to ensure the pipeline continues even if one detection fails:

def safe_api_call(func: Callable, text: str) -> List[Dict[str, Any]]:
    try:
        return func(text)
    except ClientError as e:
        logger.error(f"API call failed: {str(e)}")
        return []  # Return empty list on failure

#Preparing for Storage

Finally, we format the extracted data for our Iceberg tables:

def prepare_for_storage(processed_sections: Dict[str, Dict[str, List]]) -> Dict[str, Any]:
    return {
        "entities": json.dumps([
            entity 
            for section in processed_sections.values() 
            for entity in section["entities"]
        ]),
        "medical_codes": json.dumps({
            "diagnoses": [
                dx 
                for section in processed_sections.values() 
                for dx in section["diagnoses"]
            ],
            "medications": [
                med 
                for section in processed_sections.values() 
                for med in section["medications"]
            ]
        })
    }

This structured data flows into our Iceberg tables, where we can use Athena's JSON functions to analyze:

  • Diagnosis patterns across patient populations
  • Medication prescribing trends
  • Common symptoms and conditions
  • Treatment effectiveness over time

#4. Data Storage with Apache Iceberg

A final Lambda function converts complex clinical data into queryable Iceberg tables using the AWS SDK for pandas (awswrangler), a library that integrates pandas functionality to AWS services. The library is easily added via a Lambda layer.

def write_to_iceberg(record: Dict[str, Any], bucket: str, database: str, table: str):
    df = pd.DataFrame([record])
    
    # Store complex objects as queryable JSON
    df["soap_note"] = df["soap_note"].apply(json.dumps)
    df["medical_codes"] = df["medical_codes"].apply(json.dumps)
 
    # Define schema for clinical data
    dtype = {
        "external_call_id": "string",
        "processed_at": "timestamp",
        "soap_note": "string",  # JSON containing SOAP sections
        "medical_codes": "string",  # JSON containing extracted codes
    }
 
    # Configure Iceberg table with optimizations
    wr.athena.to_iceberg(
        df=df,
        database=database,
        table=table,
        schema_evolution=True,
        encryption="SSE_S3",
        additional_table_properties={
            "write_target_data_file_size_bytes": "536870912",
            "format-version": "2",
        }
    )

The pipeline stores SOAP notes and medical codes as JSON strings within Iceberg tables, enabling complex queries through Athena's JSON functions. This approach provides several benefits:

  1. Schema Flexibility: New fields can be added to the SOAP notes or medical codes without requiring table alterations
  2. Optimized Storage: Large clinical text is compressed and stored efficiently
  3. SQL Accessibility: Complex nested data becomes queryable through standard SQL:
SELECT 
  external_call_id,
  processed_at,
  -- Extract specific SOAP sections
  JSON_EXTRACT_SCALAR(soap_note, '$.subjective') as subjective,
  JSON_EXTRACT_SCALAR(soap_note, '$.objective') as objective,
  -- Extract and analyze medical codes
  JSON_EXTRACT(medical_codes, '$.icd10_codes') as diagnoses,
  JSON_EXTRACT(medical_codes, '$.rx_codes') as medications
FROM clinical_data.soap_notes
WHERE processed_at > CURRENT_DATE - INTERVAL '30' DAY;

This setup enables downstream analytics while maintaining the original document structure. The Iceberg format also provides ACID transactions and time travel capabilities, facilitating data governance.

#Future Work

I'm currently exploring:

  1. Analytics Enhancements

    • Creating standardized clinical metrics views
    • Building trend analysis pipelines
    • Implementing quality scoring for SOAP notes
  2. Model Improvements

    • Testing specialized medical models for each SOAP section
    • Adding automated quality checks for generated notes
    • Expanding medical code extraction coverage
  3. Data Pipeline Optimization

    • Implementing data partitioning strategies
    • Adding data quality validation steps
    • Building automated testing for the extraction pipeline