Schema, metadata, and connections—unified for lean data teams working across BigQuery, Snowflake, Redshift, SQL Server, and PostgreSQL.
pip install schema-mapper[all]
One canonical approach replaces fragmented tooling
A proven 4-stage Extract-Load pipeline
Understand data quality, detect anomalies, and identify issues before loading.
Standardize formats, fix errors, handle missing values, and validate data.
Create optimized schemas with the right types, modes, and platform-specific features.
Export DDL, JSON schemas, or CLI commands ready for your target platform.
Whether you're migrating clouds, building pipelines, or prototyping architectures
Building multi-cloud data pipelines at scale
Accelerating dbt workflows and analytics delivery
ML preprocessing and feature engineering workflows (v1.3.0)
Standardizing data practices across the organization
DataFrame-first API, Enhanced Database Discovery, and ML Feature Engineering—Query once, analyze anywhere, DataFrames everywhere
All database queries return pandas DataFrames. Consistent API across all 5 platforms.
Programmatically explore your entire data warehouse with introspection methods.
Intelligent feature analysis and automated preprocessing for machine learning workflows.
Customizable plotting with sensible defaults for data profiling and analysis.
Moving from Redshift to Snowflake? Replatforming to BigQuery? Generate platform-specific schemas instantly without rewriting everything.
Stop hardcoding schemas in your ETL. Auto-detect types, clean column names, validate data quality — all before you load.
Generate DDL with clustering, partitioning, and distribution strategies baked in. Platform-optimized out of the box.
Test data architectures across platforms in minutes. Compare BigQuery vs Snowflake performance before you commit.
Single API for BigQuery, Snowflake, Redshift, SQL Server, PostgreSQL. One codebase, zero lock-in.
YAML-driven schemas with built-in governance. Auto-generate data dictionaries and documentation.
UPSERT, SCD Type 2, CDC patterns. Platform-optimized MERGE statements for all databases.
Thread-safe pools with health checks. Built for high-concurrency production workloads.
Quality scores, anomaly detection, pattern recognition. Understand data before you load it.
Standardization, validation, cleaning, deduplication. Reproducible preprocessing pipelines.
Target correlation analysis and auto-encoding. Feature importance identification for ML workflows.
Introspect warehouse structure programmatically. Export table inventories with metadata.
Platform-agnostic schema format. Define once, render to any database on demand.
Auto-detect dates, numbers, booleans. No manual type casting or schema guesswork.
Built-in PII flags and metadata validation. Compliance-ready from day one.
Production-ready CREATE TABLE with clustering, partitioning, and optimization baked in.
Exponential backoff for transient errors. Built-in resilience for production pipelines.
Context managers and savepoints. Atomic operations across all supported platforms.
Infer once. Render everywhere. Your single source of truth.
| No vendor lock-in — switch databases without rewriting schemas | |
| Consistent type mapping across all platforms | |
| Extensible renderer pattern — add new databases easily |
pip install schema-mapper
Core package only — no platform dependencies
pip install schema-mapper[bigquery]
pip install schema-mapper[snowflake]
pip install schema-mapper[redshift]
Install only what you need
pip install schema-mapper[all]
Everything included — profiling, preprocessing, visualization, all platforms
From DataFrame to production database in one unified API
from schema_mapper import infer_canonical_schema
from schema_mapper.connections import ConnectionFactory, ConnectionConfig
import pandas as pd
# 1. Infer schema from DataFrame
df = pd.read_csv('events.csv')
schema = infer_canonical_schema(
df,
table_name='events',
dataset_name='analytics',
partition_columns=['event_date'],
cluster_columns=['user_id', 'event_type']
)
# 2. Add metadata (governance ready!)
schema.description = "User interaction events from web and mobile"
schema.owner = "analytics-team"
schema.columns[0].description = "Unique event identifier"
schema.columns[0].pii = False # PII governance flag
# 3. Connect to ANY database with unified API
config = ConnectionConfig('config/connections.yaml')
with ConnectionFactory.get_connection('bigquery', config) as conn:
# Test connection
if conn.test_connection():
print("Connected!")
# Create table from canonical schema
conn.create_table_from_schema(schema, if_not_exists=True)
# Load data
conn.load_dataframe(df, schema.table_name, schema.dataset_name)
# Switch platforms? Just change 'bigquery' to 'snowflake'!
# Same code works for all 5 platforms.
from schema_mapper import load_schema_from_yaml, save_schema_to_yaml
# Save schema + metadata to YAML (version control this!)
save_schema_to_yaml(schema, 'schemas/events.yaml')
# Load schema in deployment pipeline
schema = load_schema_from_yaml('schemas/events.yaml')
# Validate metadata completeness (CI/CD ready)
errors = schema.validate_metadata(
required_table_fields=['description', 'owner'],
required_column_fields=['description', 'pii']
)
if errors:
raise ValueError(f"Metadata validation failed: {errors}")
# Export data dictionary (Markdown, CSV, JSON)
markdown = schema.export_data_dictionary('markdown')
with open('docs/events_dictionary.md', 'w') as f:
f.write(markdown)
# Result: Schema + metadata as single source of truth!
from schema_mapper import Profiler, PreProcessor, SchemaMapper
import pandas as pd
# Load your data
df = pd.read_csv('customer_data.csv')
# Step 1: PROFILE - Understand your data
profiler = Profiler(df)
quality = profiler.assess_quality()
print(f"Quality Score: {quality['overall_score']}/100")
anomalies = profiler.detect_anomalies()
print(f"Found {len(anomalies)} columns with outliers")
# Visualize (no matplotlib imports needed!)
profiler.plot_distributions()
profiler.plot_correlations()
# Step 2: CLEAN - Preprocess and standardize
preprocessor = PreProcessor(df)
df_clean = (preprocessor
.fix_whitespace()
.standardize_column_names()
.standardize_dates(['created_at'])
.validate_emails('email', fix=True)
.remove_duplicates(subset=['user_id'])
.handle_missing_values(strategy='auto')
.apply()
)
# Step 3: GENERATE - Create optimized schema
mapper = SchemaMapper('bigquery')
schema, mapping = mapper.generate_schema(df_clean)
ddl = mapper.generate_ddl(df_clean, 'customers', 'analytics', 'my-project')
# Step 4: EXPORT - Save artifacts
df_clean.to_csv('customers_clean.csv', index=False)
with open('schema.json', 'w') as f:
f.write(mapper.generate_bigquery_schema_json(df_clean))
with open('create_table.sql', 'w') as f:
f.write(ddl)
print("Ready to load!")
from schema_mapper import prepare_for_load
import pandas as pd
# Load your messy data
df = pd.read_csv('messy_data.csv')
# Prepare for ANY platform in one line!
df_clean, schema, issues = prepare_for_load(
df,
target_type='bigquery', # or 'snowflake', 'redshift', 'sqlserver', 'postgresql'
)
# Check for issues
if not issues['errors']:
print(f"SUCCESS: {len(schema)} columns prepared and ready to load!")
else:
print("ERROR: Fix these errors:", issues['errors'])
from schema_mapper import Profiler, PreProcessor
import pandas as pd
# Load your data
df = pd.read_csv('customer_churn.csv')
# 1. ANALYZE: Find most important features for your target
profiler = Profiler(df, name='churn_analysis')
# Automatic target correlation analysis (handles categorical targets!)
feature_importance = profiler.analyze_target_correlation(
target_column='churn', # Works with both categorical and numeric targets
method='pearson',
top_n=15
)
print(feature_importance)
# feature correlation abs_correlation
# 0 tenure_months -0.352 0.352
# 1 monthly_charges 0.298 0.298
# 2 support_tickets 0.245 0.245
# 2. VISUALIZE: Feature importance chart (one line!)
fig = profiler.plot_target_correlation('churn', top_n=15)
fig.savefig('feature_importance.png', dpi=300, bbox_inches='tight')
# 3. PREPROCESS: Auto-detect and encode categorical columns
preprocessor = PreProcessor(df)
preprocessor.auto_encode_categorical(
exclude_columns=['churn', 'customer_id'], # Don't encode target/ID
max_categories=10, # Only encode low-cardinality
drop_first=True # Avoid multicollinearity
)
# ML-ready dataset!
print(f"Ready for ML: {preprocessor.df.shape}")
# Original: (10000, 15) → After encoding: (10000, 45)
from schema_mapper.connections import ConnectionFactory, ConnectionConfig
# Connect to any database
config = ConnectionConfig('config/connections.yaml')
with ConnectionFactory.get_connection('bigquery', config) as conn:
# 1. Get all tables with metadata (DataFrames everywhere!)
tables = conn.get_tables(schema_name='analytics')
print(tables)
# table_name table_type num_rows size_mb created
# 0 users TABLE 150000 245.5 2024-01-01 10:00:00
# 1 events TABLE 5000000 8920.3 2024-01-05 11:00:00
# Find large tables
large_tables = tables[tables['size_mb'] > 1000]
# 2. Get all schemas/datasets
datasets = conn.get_datasets()
print(f"Found {len(datasets)} datasets")
# 3. Get complete warehouse structure
tree = conn.get_database_tree(format='dataframe')
tree.to_csv('warehouse_inventory.csv')
# Or as JSON for documentation
tree_dict = conn.get_database_tree(format='dict')
# Same code works for all 5 platforms!
from schema_mapper.canonical import infer_canonical_schema
from schema_mapper.renderers import RendererFactory
# Step 1: Infer canonical schema once
canonical = infer_canonical_schema(
df,
table_name='events',
dataset_name='analytics',
partition_columns=['event_date'],
cluster_columns=['user_id']
)
# Step 2: Render to any platform
for platform in ['bigquery', 'snowflake', 'redshift']:
renderer = RendererFactory.get_renderer(platform, canonical)
ddl = renderer.to_ddl()
print(f"{platform.upper()} DDL:")
print(ddl)
print()
from schema_mapper.generators_enhanced import get_enhanced_ddl_generator
from schema_mapper.ddl_mappings import DDLOptions, ClusteringConfig, PartitionConfig, PartitionType
# BigQuery: Partitioned by date, clustered by user_id
generator = get_enhanced_ddl_generator('bigquery')
options = DDLOptions(
partitioning=PartitionConfig(
column='event_date',
partition_type=PartitionType.TIME,
expiration_days=365
),
clustering=ClusteringConfig(columns=['user_id', 'event_type'])
)
# Generate optimized DDL
ddl = generator.generate(
schema=schema,
table_name='events',
dataset_name='analytics',
project_id='my-project',
ddl_options=options
)
# Result: CREATE TABLE with PARTITION BY and CLUSTER BY
Automatic optimization based on what each database supports
| Feature | BigQuery | Snowflake | Redshift | SQL Server | PostgreSQL |
|---|---|---|---|---|---|
| Partitioning | DATE/TIMESTAMP/RANGE | Auto Micro | Not Supported | Function+Scheme | RANGE/LIST/HASH |
| Clustering | Up to 4 cols | Up to 4 cols | Not Supported | Clustered Index | Via Indexes |
| Distribution | Not Supported | Not Supported | KEY/ALL/EVEN/AUTO | Not Supported | Not Supported |
| Sort Keys | Not Supported | Not Supported | Compound/Interleaved | Not Supported | Not Supported |
The only tool that combines profiling, cleaning, and schema generation
Great Expectations excels at validation, but schema-mapper adds profiling, cleaning, and multi-platform schema generation in one package.
dbt handles transformation (the "T" in ELT), while schema-mapper handles Extract-Load preparation. They complement each other perfectly.
Pandas Profiling generates reports, but schema-mapper goes beyond reporting to actually clean, validate, and prepare data for loading.
Commercial ETL tools work great but lock you in. schema-mapper gives you programmatic control, open source freedom, and multi-platform support.
schema-mapper complements your existing data stack
Use schema-mapper for Extract-Load preparation, then dbt for transformations. Perfect ELT pattern.
Integrate schema-mapper tasks into your orchestration pipelines for automated data preparation.
Profile and clean data interactively before analysis or modeling in notebooks.
Use schema-mapper for preparation and schema generation, Great Expectations for production validation.
Join lean data teams shipping production pipelines across BigQuery, Snowflake, Redshift, SQL Server, and PostgreSQL
Open source • MIT License • Built for lean data teams • Schema + metadata + connections unified