Bulk Loading¶
Bulk loading with NodeSet and RelationshipSet is Graphio's high-performance approach for loading large datasets into Neo4j. This guide covers advanced patterns, optimization techniques, and best practices.
Overview¶
Bulk loading is ideal for:
- ETL processes - Loading data from external sources
- Data migration - Moving data between Neo4j instances
- Initial data loading - Populating a fresh database
- Batch processing - Regular data updates from feeds
Key advantages:
- ✅ High performance - Automatic batching (10,000 nodes/relationships per batch)
- ✅ Memory efficient - Processes data in chunks
- ✅ Simple API - Define containers, add data, load
NodeSet: Bulk Node Loading¶
Basic Usage¶
from graphio import NodeSet
from neo4j import GraphDatabase
driver = GraphDatabase.driver('neo4j://localhost:7687', auth=('neo4j', 'password'))
# Define container for Person nodes
people = NodeSet(['Person'], merge_keys=['email'])
# Add nodes
people.add({'name': 'Alice Smith', 'email': 'alice@example.com', 'age': 30})
people.add({'name': 'Bob Johnson', 'email': 'bob@example.com', 'age': 25})
# Bulk load to Neo4j
people.create(driver)
Multiple Labels¶
Assign multiple labels to all nodes in a NodeSet:
employees = NodeSet(['Person', 'Employee'], merge_keys=['employee_id'])
employees.add({'name': 'Alice', 'employee_id': 'E001', 'department': 'Engineering'})
Additional Labels¶
Use additional_labels to add extra labels to nodes after a MERGE, without including them in the MERGE pattern. This is useful when multiple datasets produce nodes with the same identity but you want to tag some of them with a more specific label.
# Merge on the shared label
items = NodeSet(['Item'], merge_keys=['id'])
items.add({'id': '1', 'name': 'First'})
items.merge(driver)
# Merge on the same label and key, but add an extra label
special_items = NodeSet(['Item'], merge_keys=['id'], additional_labels=['Featured'])
special_items.add({'id': '1', 'name': 'First', 'priority': 'high'})
special_items.merge(driver)
-- Generated Cypher
MERGE (n:Item { id: properties.id })
ON CREATE SET n = properties
ON MATCH SET n += properties
SET n:Featured
Because additional_labels are applied via SET after the MERGE, they accumulate. If the same node is merged from different NodeSets with different additional_labels, the node ends up with all of them.
Tip
additional_labels also works with create(). In that case, the additional labels are included directly in the CREATE clause.
Merge Keys and Uniqueness¶
Merge keys define node uniqueness for MERGE operations:
# Single merge key
users = NodeSet(['User'], merge_keys=['username'])
# Multiple merge keys (compound uniqueness)
locations = NodeSet(['Location'], merge_keys=['country', 'city'])
locations.add({'country': 'Germany', 'city': 'Munich', 'population': 1500000})
Performance Tip
Always create indexes for merge keys before bulk loading:
Default Properties¶
Apply properties to all nodes automatically:
# All employees get a default department
employees = NodeSet(
['Person', 'Employee'],
merge_keys=['employee_id'],
default_props={'company': 'ACME Corp', 'active': True}
)
employees.add({'name': 'Alice', 'employee_id': 'E001'})
# Result: Alice gets company='ACME Corp' and active=True automatically
Handling Duplicates¶
Option 1: Allow duplicates (fastest)
people.add({'name': 'Alice', 'email': 'alice@example.com'})
people.add({'name': 'Alice', 'email': 'alice@example.com'}) # Duplicate allowed
len(people.nodes) # Returns 2
Option 2: Built-in deduplication (fast)
# Enable deduplication at NodeSet creation
people = NodeSet(['Person'], merge_keys=['email'], deduplicate=True)
people.add({'name': 'Alice', 'email': 'alice@example.com'})
people.add({'name': 'Alice', 'email': 'alice@example.com'}) # Skipped automatically
len(people.nodes) # Returns 1
# Override deduplication with force parameter
people.add({'name': 'Alice', 'email': 'alice@example.com'}, force=True) # Added despite duplicate
len(people.nodes) # Returns 2
Performance Note
The built-in deduplicate=True option uses an efficient internal index and is suitable for large datasets.
Advanced Deduplication Patterns¶
Mixed deduplication within the same NodeSet:
# Process multiple data sources with different deduplication needs
customers = NodeSet(['Customer'], merge_keys=['email'], deduplicate=True)
# Source A: Clean data, allow deduplication
for record in clean_customer_data:
customers.add(record) # Automatically deduplicated
# Source B: Known to have intentional duplicates (e.g., for counting)
for record in duplicate_tracking_data:
customers.add(record, force=True) # Force add despite duplicates
Multiple merge keys deduplication:
# Deduplicate on compound keys
locations = NodeSet(['Location'], merge_keys=['country', 'city'], deduplicate=True)
locations.add({'country': 'Germany', 'city': 'Munich', 'population': 1500000})
locations.add({'country': 'Germany', 'city': 'Berlin', 'population': 3700000})
locations.add({'country': 'Germany', 'city': 'Munich', 'population': 1600000}) # Skipped - same country+city
len(locations.nodes) # Returns 2 (Munich and Berlin)
Batch operations with deduplication:
# add_nodes() also respects deduplication settings
people = NodeSet(['Person'], merge_keys=['email'], deduplicate=True)
batch_data = [
{'name': 'Alice', 'email': 'alice@example.com'},
{'name': 'Bob', 'email': 'bob@example.com'},
{'name': 'Alice', 'email': 'alice@example.com'}, # Will be skipped
{'name': 'Charlie', 'email': 'charlie@example.com'}
]
people.add_nodes(batch_data) # Automatically deduplicates during batch add
len(people.nodes) # Returns 3 (Alice, Bob, Charlie)
RelationshipSet: Bulk Relationship Loading¶
Basic Usage¶
from graphio import RelationshipSet
# Define relationship container
employments = RelationshipSet(
'WORKS_AT', # Relationship type
['Person'], # Start node labels
['Company'], # End node labels
['email'], # Start node matching properties
['name'] # End node matching properties
)
# Add relationships
employments.add(
{'email': 'alice@example.com'}, # Start node matcher
{'name': 'ACME Corp'}, # End node matcher
{'position': 'Developer', 'since': '2023-01-15'} # Relationship properties
)
# Bulk load
employments.create(driver)
Multiple Match Properties¶
Match nodes using multiple properties:
# Match locations by country AND city
visits = RelationshipSet(
'VISITED',
['Person'], ['Location'],
['email'], ['country', 'city']
)
visits.add(
{'email': 'alice@example.com'},
{'country': 'Germany', 'city': 'Munich'},
{'date': '2023-06-15', 'duration_days': 5}
)
Default Relationship Properties¶
survey_responses = RelationshipSet(
'RATED', ['Person'], ['Product'],
['user_id'], ['product_id'],
default_props={'survey': 'Q3-2023', 'method': 'online'}
)
# Every relationship gets survey and method automatically
survey_responses.add(
{'user_id': 'U001'},
{'product_id': 'P456'},
{'rating': 5}
)
Append Properties on Relationships¶
Just like NodeSet, RelationshipSet supports append_props to accumulate values in a list across multiple merges. This is useful when the same relationship is merged from different data sources and you want to track all values instead of overwriting.
# Track sources that contributed to each relationship
citations = RelationshipSet(
'CITES', ['Paper'], ['Paper'],
['doi'], ['doi'],
append_props=['source']
)
citations.add({'doi': '10.1/a'}, {'doi': '10.1/b'}, {'source': 'pubmed', 'score': 0.9})
citations.merge(driver)
# Result: source=['pubmed'], score=0.9
citations2 = RelationshipSet(
'CITES', ['Paper'], ['Paper'],
['doi'], ['doi'],
append_props=['source']
)
citations2.add({'doi': '10.1/a'}, {'doi': '10.1/b'}, {'source': 'scopus', 'score': 0.95})
citations2.merge(driver)
# Result: source=['pubmed', 'scopus'], score=0.95 (score overwritten, source appended)
APOC Required
append_props on RelationshipSet uses apoc.map.removeKeys(), same as NodeSet.
Performance Optimization¶
Index Strategy¶
Always create indexes before bulk loading:
# For NodeSets
people.create_index(driver) # Creates index on email
companies.create_index(driver) # Creates index on name
# For RelationshipSets
employments.create_index(driver) # Creates indexes on both Person(email) and Company(name)
Compound indexes for multiple merge keys:
locations = NodeSet(['Location'], merge_keys=['country', 'city'])
locations.create_index(driver) # Creates compound index on (country, city)
Loading Order Matters¶
Load nodes before relationships:
# ✅ Correct order
companies.create(driver) # Load companies first
people.create(driver) # Then people
employments.create(driver) # Finally relationships
# ❌ Wrong order - relationships may fail if nodes don't exist
employments.create(driver) # Relationships first - BAD!
people.create(driver)
companies.create(driver)
Create vs Merge Operations¶
Create Operation¶
- Speed: Fastest - no uniqueness checks
- Use case: Fresh databases, known unique data
- Behavior: Always creates new nodes/relationships
Merge Operation¶
- Speed: Slower - checks for existing data
- Use case: Updating existing data, uncertain about duplicates
- Behavior: Updates existing, creates new
Advanced Merge Options¶
APOC Required
The preserve and append_props features require the APOC library to be installed in your Neo4j database. These features use apoc.map.removeKeys() and other APOC utilities.
Preserve specific properties during merge:
# Don't overwrite 'created_date' and 'original_source' on existing nodes
people.merge(driver, preserve=['created_date', 'original_source'])
# Equivalent Cypher behavior:
# ON CREATE SET n += properties
# ON MATCH SET n += apoc.map.removeKeys(properties, ['created_date', 'original_source'])
Append to array properties (works on both NodeSet and RelationshipSet):
# Append to 'tags' array instead of replacing
articles = NodeSet(['Article'], merge_keys=['id'], append_props=['tags'])
articles.merge(driver)
# If node exists with tags=['tech'], and new node has tags=['python']
# Result: tags=['tech', 'python']
Set merge behavior at creation:
# Define merge behavior when creating NodeSet
users = NodeSet(
['User'], ['username'],
preserve=['registration_date'],
append_props=['login_history']
)
Error Handling and Validation¶
Connection Errors¶
from neo4j.exceptions import ServiceUnavailable
try:
people.create(driver)
except ServiceUnavailable:
print("Neo4j database is not available")
Data Validation¶
def validate_email(email):
return '@' in email
# Validate data before adding
for person_data in source_data:
if validate_email(person_data.get('email', '')):
people.add(person_data)
else:
print(f"Invalid email: {person_data}")
Missing Reference Nodes¶
# RelationshipSets don't validate that referenced nodes exist
# This relationship will be silently ignored if nodes don't exist
employments.add(
{'email': 'nonexistent@example.com'}, # Node doesn't exist
{'name': 'ACME Corp'},
{'position': 'Developer'}
)
Important
Graphio does not validate that nodes referenced in relationships actually exist. Ensure proper loading order: nodes before relationships.
Real-World Example: ETL Pipeline¶
from graphio import NodeSet, RelationshipSet
from neo4j import GraphDatabase
import pandas as pd
def load_employee_data(csv_file, driver):
# Read source data
df = pd.read_csv(csv_file)
# Create containers with built-in deduplication
employees = NodeSet(['Person', 'Employee'], merge_keys=['employee_id'])
departments = NodeSet(['Department'], merge_keys=['name'], deduplicate=True) # Deduplicate departments
works_in = RelationshipSet('WORKS_IN', ['Employee'], ['Department'],
['employee_id'], ['name'])
# Process data
for _, row in df.iterrows():
# Add employee
employees.add({
'employee_id': row['id'],
'name': row['full_name'],
'email': row['email'],
'hire_date': row['start_date']
})
# Add department - automatically deduplicated
departments.add({
'name': row['department'],
'budget': row['dept_budget']
})
# Add relationship
works_in.add(
{'employee_id': row['id']},
{'name': row['department']},
{'start_date': row['start_date']}
)
# Create indexes for performance
employees.create_index(driver)
departments.create_index(driver)
# Load data in correct order
departments.merge(driver) # Departments first
employees.merge(driver) # Then employees
works_in.merge(driver) # Finally relationships
print(f"Loaded {len(employees.nodes)} employees into {len(departments.nodes)} departments")
# Usage
driver = GraphDatabase.driver('neo4j://localhost:7687', auth=('neo4j', 'password'))
load_employee_data('employees.csv', driver)
This example demonstrates:
- Reading from external data source
- Processing data with validation
- Using appropriate merge strategies
- Creating indexes for performance
- Loading in correct order
Best Practices Summary¶
- 🚀 Performance
- Create indexes before bulk loading
- Load nodes before relationships
-
Use
create()for fresh data,merge()for updates -
💾 Memory Management
- Process very large datasets in chunks
-
Use
deduplicate=Truefor automatic duplicate prevention -
🔍 Data Quality
- Validate data before adding to containers
- Handle missing reference nodes appropriately
-
Use proper merge keys for your domain
-
⚡ Optimization
- Batch related operations together
- Use default properties to reduce repetitive data
- Monitor loading progress for large datasets
Integration with OGM¶
Bulk loading works seamlessly with OGM models. Use OGM to define your data structure and validation, then leverage bulk loading for high-performance operations.
Why Combine OGM + Bulk Loading?¶
- OGM provides: Structure, validation, type safety, intuitive queries
- Bulk loading provides: High performance, memory efficiency, automatic batching
- Together: Best developer experience with maximum performance
Quick Integration Example¶
from graphio import NodeModel, NodeSet, Base
# Define structure with OGM
class Employee(NodeModel):
_labels = ['Person', 'Employee']
_merge_keys = ['employee_id']
name: str
employee_id: str
email: str
department: str
# Set up indexes using OGM
Base.set_driver(driver)
Base.create_indexes()
# Get bulk container directly from OGM model
employees = Employee.dataset() # Automatically uses Employee's configuration
# Validate with OGM, load with bulk
import pandas as pd
df = pd.read_csv('employees.csv')
for _, row in df.iterrows():
# Validate data structure with OGM
emp_data = {
'name': row['name'],
'employee_id': row['id'],
'email': row['email'],
'department': row['dept']
}
employee = Employee(**emp_data) # Validates or raises error
# Add to bulk container
employees.add(employee.model_dump())
# Bulk load validated data
employees.create(driver)
# Query with OGM convenience
tech_employees = Employee.match(Employee.department == 'Technology').all()
Integration Patterns¶
Pattern 1: Validation-First Loading¶
def validated_bulk_load(model_class, data_source, driver):
"""Generic function to bulk load any OGM model with validation"""
# Create bulk container using dataset() method
bulk_container = model_class.dataset()
valid_count = 0
error_count = 0
for record in data_source:
try:
# Validate with OGM model
instance = model_class(**record)
bulk_container.add(instance.model_dump())
valid_count += 1
except ValidationError as e:
print(f"Skipping invalid record: {e}")
error_count += 1
# Bulk load validated data
bulk_container.create(driver)
return valid_count, error_count
# Usage with any model
class Product(NodeModel):
_labels = ['Product']
_merge_keys = ['sku']
sku: str
name: str
price: float
loaded, errors = validated_bulk_load(Product, product_data, driver)
Pattern 2: Mixed Operations¶
# Use different approaches for different operations
class InventoryManager:
def __init__(self, driver):
self.driver = driver
Base.set_driver(driver)
def daily_import(self, csv_file):
"""Bulk load daily inventory updates"""
products = Product.dataset() # Use Product model's dataset
df = pd.read_csv(csv_file)
for _, row in df.iterrows():
products.add({
'sku': row['sku'],
'stock_count': row['stock'],
'last_updated': datetime.now()
})
products.merge(self.driver) # Update existing products
return len(products.nodes)
def price_adjustment(self, sku: str, new_price: float):
"""Individual price update using OGM"""
product = Product.match(Product.sku == sku).first()
if product:
product.price = new_price
product.merge()
return True
return False
Performance Benefits¶
When you combine OGM + bulk loading effectively:
# ❌ Slow: Individual OGM operations for bulk data
for record in large_dataset: # 10,000 records
person = Person(**record)
person.merge() # 10,000 individual database calls!
# ✅ Fast: Validate with OGM, load with bulk
people = Person.dataset() # Use dataset() method
for record in large_dataset: # 10,000 records
person = Person(**record) # Validate locally
people.add(person.model_dump()) # Add to batch
people.create(driver) # Single efficient bulk operation
Real-World Integration: Data Pipeline¶
class ETLPipeline:
def __init__(self, driver):
self.driver = driver
Base.set_driver(driver)
Base.create_indexes() # Create all OGM model indexes
def process_customers(self, customer_file):
"""Load customer data with validation"""
customers = Customer.dataset() # Use Customer model's dataset
addresses = Address.dataset() # Use Address model's dataset
lives_at = Customer.lives_at.dataset() # Use relationship's dataset
df = pd.read_csv(customer_file)
for _, row in df.iterrows():
# Validate customer data
customer_data = {
'email': row['email'],
'name': row['name'],
'phone': row['phone']
}
Customer(**customer_data) # Validates
# Validate address data
address_data = {
'address_id': row['address_id'],
'street': row['street'],
'city': row['city'],
'country': row['country']
}
Address(**address_data) # Validates
# Add to bulk containers
customers.add(customer_data)
addresses.add(address_data)
lives_at.add(
{'email': row['email']},
{'address_id': row['address_id']},
{'since': row['move_in_date']}
)
# Load in correct order
addresses.create(self.driver)
customers.create(self.driver)
lives_at.create(self.driver)
return len(customers.nodes)
def customer_analytics(self):
"""Use OGM for complex analytics after bulk loading"""
from graphio.ogm.model import CypherQuery
# Find customers in major cities
# Complex relationship queries require custom Cypher
major_city_query = CypherQuery("""
MATCH (c:Customer)-[:LIVES_AT]->(city:City)
WHERE city.city IN ['London', 'Paris', 'Berlin']
RETURN DISTINCT c
""")
major_city_customers = Customer.match(major_city_query).all()
return [c.name for c in major_city_customers]
# Usage
pipeline = ETLPipeline(driver)
loaded = pipeline.process_customers('customers.csv') # Bulk + validation
analytics = pipeline.customer_analytics() # OGM queries
Integration Best Practices¶
-
Use OGM for structure definition
-
Leverage OGM validation
-
Use registry for index management
-
Choose the right approach for each task
This hybrid approach gives you the best of both worlds: the structure and developer experience of OGM with the performance of bulk loading.
For complete OGM documentation, see the OGM Guide.