Managing enrichers
Quick start guide to creating Enricher for your OSINT investigations.
Understanding Enrichers
Enrichers are the high-level business logic layer in Flowsint. While types define data structures and tools wrap external utilities, enrichers orchestrate the entire intelligence gathering workflow. An enricher takes input data of one type, processes it through various tools or APIs, validates and enriches the results, creates graph database nodes and relationships, and returns structured output.
Every enricher in Flowsint follows a two-phase execution model. The scan phase contains the core enriching logic where tools are executed, APIs are called, and data is gathered. Then, the postprocessing phase creates Neo4j graph nodes and relationships while returning the processed results. Input validation and normalization happens automatically through Pydantic type validation.
Enrichers differ from tools in several fundamental ways. Tools are low-level wrappers that return raw data without knowledge of the Flowsint ecosystem. Enrichers are high-level workflows that understand types, create graph nodes, handle parameters, and orchestrate multiple tools. When you want to add a new data source, you create a tool. When you want to add a new intelligence workflow, you create an enricher.
Enricher architecture
The enricher system is built around an abstract base class that defines the interface and execution flow. Every enricher you create inherits from this base class and implements specific methods.
The Enricher base class
The base class lives at flowsint-core/src/flowsint_core/core/enricher_base.py and provides the framework for all enrichers. Here's what a minimal enricher looks like:
from typing import List
from flowsint_core.core.enricher_base import Enricher
from flowsint_enrichers.registry import flowsint_enricher
from flowsint_types import Domain, Ip
@flowsint_enricher
class MyEnricher(Enricher):
"""Description of what this enricher does."""
# Define input and output types as base types (not lists)
InputType = Domain
OutputType = Ip
@classmethod
def name(cls) -> str:
"""Unique identifier for this enricher."""
return "domain_to_ip"
@classmethod
def category(cls) -> str:
"""Category this enricher belongs to."""
return "Domain"
@classmethod
def key(cls) -> str:
"""Primary key field name for this enricher."""
return "domain"
async def scan(self, data: List[InputType]) -> List[OutputType]:
"""Core enriching logic."""
pass
def postprocess(self, results: List[OutputType], original_input: List[InputType]) -> List[OutputType]:
"""Create graph nodes and relationships."""
pass
# Export types for easy access
InputType = MyEnricher.InputType
OutputType = MyEnricher.OutputTypeThe InputType and OutputType class attributes define what data the enricher accepts and returns. These should be Pydantic types from the flowsint-types package defined as base types (e.g., Domain, not List[Domain]). The base class uses these type definitions to automatically generate JSON schemas for the API and handle validation automatically.
At the end of the file, you should export the types for easy access by other modules.
The two phases
Understanding the two execution phases is crucial for writing effective enrichers.
Scanning is where the real work happens. This async method receives validated input data as a list of InputType instances and executes your intelligence gathering logic. You might instantiate tools, call external APIs, process results, and build up your output data. This phase should focus purely on gathering and processing data without worrying about the graph database. The base class automatically handles input validation through Pydantic before the scan phase begins.
Postprocessing creates the graph database structure. After scanning completes, this method receives both the results and the original input. It creates Neo4j nodes for each entity, establishes relationships between them, and returns the final results. This separation keeps graph logic separate from business logic.
Creating a simple enricher
Let's walk through creating a complete enricher from scratch. We'll build an enricher that converts domains to IP addresses using DNS resolution.
Setting up the file structure
Enrichers are organized by their input type. Create a new file in the appropriate directory under flowsint-enrichers/src/flowsint_enrichers/:
cd flowsint-enrichers/src/flowsint_enrichers/domain/
touch to_ip.pyIf you're creating an enricher for a new input type, you may need to create a new directory first.
Implementing the basic structure
Start with the imports and class definition:
import socket
from typing import List
from flowsint_enrichers.registry import flowsint_enricher
from flowsint_core.core.enricher_base import Enricher
from flowsint_core.core.logger import Logger
from flowsint_types import Domain, Ip
@flowsint_enricher
class DomainToIpEnricher(Enricher):
"""Resolves domain names to their IP addresses using DNS."""
# Define types as base types (not lists)
InputType = Domain
OutputType = Ip
@classmethod
def name(cls) -> str:
return "domain_to_ip"
@classmethod
def category(cls) -> str:
return "Domain"
@classmethod
def key(cls) -> str:
return "domain"
@classmethod
def documentation(cls) -> str:
return """
This enricher resolves domain names to their IP addresses using
standard DNS queries. It accepts a list of domains and returns
the corresponding IP addresses.
"""
# Export types at the end of the file
InputType = DomainToIpEnricher.InputType
OutputType = DomainToIpEnricher.OutputTypeThe name() method returns a unique identifier for this enricher. Use lowercase with underscores, following the pattern inputtype_to_outputtype. The category() groups related enrichers together in the UI. The key() specifies which field serves as the primary identifier, typically matching the input type.
Implementing the scan logic
The scan method contains your core intelligence gathering logic. It receives a list of validated InputType instances and returns a list of OutputType instances.
async def scan(self, data: List[InputType]) -> List[OutputType]:
"""
Resolve each domain to its IP address.
Args:
data: List of Domain objects to resolve
Returns:
List of Ip objects
"""
results: List[OutputType] = []
for domain in data:
try:
# Perform DNS resolution
ip_address = socket.gethostbyname(domain.domain)
# Create IP object
ip = Ip(address=ip_address)
results.append(ip)
# Log successful resolution
Logger.info(
self.sketch_id,
{"message": f"Resolved {domain.domain} to {ip_address}"}
)
except socket.gaierror as e:
# DNS resolution failed
Logger.info(
self.sketch_id,
{"message": f"Failed to resolve {domain.domain}: {e}"}
)
continue
except Exception as e:
# Unexpected error
Logger.error(
self.sketch_id,
{"message": f"Error resolving {domain.domain}: {e}"}
)
continue
return resultsThis implementation iterates through each domain, performs DNS resolution, creates an IP object for successful resolutions, and logs both successes and failures. The error handling ensures that failures don't crash the entire enricher, which is important when processing many domains.
The input data has already been validated by Pydantic before reaching the scan method, so you can trust that all items are proper Domain objects.
Implementing postprocessing
The postprocess method creates graph database nodes and relationships using the new simplified API:
def postprocess(self, results: List[OutputType], original_input: List[InputType]) -> List[OutputType]:
"""
Create graph nodes and relationships.
Args:
results: IP objects from scan phase
original_input: Original Domain objects
Returns:
IP objects (unchanged)
"""
# Create nodes and relationships
for domain, ip in zip(original_input, results):
# Create nodes by passing Pydantic objects directly
self.create_node(domain)
self.create_node(ip)
# Create relationship by passing Pydantic objects directly
self.create_relationship(domain, ip, "RESOLVES_TO")
# Log the operation
self.log_graph_message(
f"IP found for domain {domain.domain} -> {ip.address}"
)
return resultsYou can pass Pydantic objects directly to create_node() and create_relationship(). The methods automatically infer the node types, primary keys, and property values from the Pydantic models.
The create_node() method accepts a Pydantic object and automatically creates a Neo4j node with the correct label and properties. The create_relationship() method takes two Pydantic objects and a relationship type string, inferring all necessary information from the objects.
Creating an enricher with tools
Most enrichers use external tools for data gathering. Let's create an enricher that uses the Subfinder tool for subdomain enumeration.
Importing the tool
Start by importing the tool along with your other dependencies:
from typing import List
from flowsint_core.core.enricher_base import Enricher
from flowsint_enrichers.registry import flowsint_enricher
from flowsint_core.core.logger import Logger
from flowsint_types import Domain
from tools.network.subfinder import SubfinderTool
@flowsint_enricher
class SubdomainEnricher(Enricher):
"""Enumerates subdomains for given domains using Subfinder."""
# Define types as base types
InputType = Domain
OutputType = Domain
@classmethod
def name(cls) -> str:
return "domain_to_subdomains"
@classmethod
def category(cls) -> str:
return "Domain"
@classmethod
def key(cls) -> str:
return "domain"
# Export types
InputType = SubdomainEnricher.InputType
OutputType = SubdomainEnricher.OutputTypeUsing the tool in scan
The scan method instantiates and uses the tool:
async def scan(self, data: List[InputType]) -> List[OutputType]:
"""
Find subdomains using Subfinder tool.
Args:
data: List of Domain objects
Returns:
List of discovered subdomain Domain objects
"""
results: List[OutputType] = []
# Instantiate the tool
subfinder = SubfinderTool()
for domain in data:
Logger.info(
self.sketch_id,
{"message": f"Enumerating subdomains for {domain.domain}"}
)
try:
# Launch the tool
subdomains = subfinder.launch(domain.domain)
# Convert strings to Domain objects
for subdomain in subdomains:
results.append(Domain(domain=subdomain, root=False))
Logger.info(
self.sketch_id,
{"message": f"Found {len(subdomains)} subdomains for {domain.domain}"}
)
except Exception as e:
Logger.error(
self.sketch_id,
{"message": f"Error enumerating subdomains for {domain.domain}: {e}"}
)
continue
return resultsNotice how the tool returns raw strings, and the enricher converts them into proper Domain objects. This separation of concerns keeps tools simple while enrichers handle type conversion.
Creating graph nodes and relationships
The postprocess phase creates parent-child relationships between domains and subdomains:
def postprocess(self, results: List[OutputType], original_input: List[InputType]) -> List[OutputType]:
"""
Create graph nodes and relationships for domains and subdomains.
Args:
results: Discovered subdomain Domain objects
original_input: Original parent Domain objects
Returns:
Subdomain Domain objects
"""
# Create nodes for parent domains
for domain in original_input:
self.create_node(domain)
# Create nodes for subdomains and relationships
for subdomain in results:
self.create_node(subdomain)
# Extract parent domain name and create relationship
parent_domain_name = self._extract_parent_domain(subdomain.domain)
parent_domain = Domain(domain=parent_domain_name)
# Create relationship using Pydantic objects
self.create_relationship(parent_domain, subdomain, "HAS_SUBDOMAIN")
# Log the operation
self.log_graph_message(
f"Subdomain found: {parent_domain_name} -> {subdomain.domain}"
)
return results
def _extract_parent_domain(self, subdomain: str) -> str:
"""Extract parent domain from subdomain."""
parts = subdomain.split('.')
if len(parts) >= 2:
return '.'.join(parts[-2:])
return subdomainAdding parameters to enrichers
Many enrichers need user-configurable parameters. Let's create an enricher that scans ports with configurable options.
Defining the parameter schema
The get_params_schema() class method defines what parameters your enricher accepts:
from typing import List, Dict, Any, Optional
flowsint_enriche
from flowsint_enrichers.registry import flowsint_enricher
from flowsint_core.core.enricher_base import Enricher
from flowsint_types import Ip, Port
from tools.network.naabu import NaabuTool
@flowsint_enricher
class IpToPortsEnricher(Enricher):
"""Scans IP addresses for open ports."""
# Define types as base types
InputType = Ip
OutputType = Port
@classmethod
def name(cls) -> str:
return "ip_to_ports"
@classmethod
def category(cls) -> str:
return "IP"
@classmethod
def key(cls) -> str:
return "address"
@classmethod
def get_params_schema(cls) -> List[Dict[str, Any]]:
"""Define configurable parameters for this enricher."""
return [
{
"name": "mode",
"type": "select",
"description": "Scan mode: active scanning or passive enumeration",
"required": True,
"default": "passive",
"options": [
{"label": "Passive", "value": "passive"},
{"label": "Active", "value": "active"},
],
},
{
"name": "port_range",
"type": "string",
"description": "Port range to scan (e.g., '1-1000' or '80,443,8080')",
"required": False,
},
{
"name": "top_ports",
"type": "select",
"description": "Scan only the most common ports",
"required": False,
"options": [
{"label": "Top 100", "value": "100"},
{"label": "Top 1000", "value": "1000"},
],
},
{
"name": "PDCP_API_KEY",
"type": "vaultSecret",
"description": "ProjectDiscovery Cloud Platform API key for passive mode",
"required": False,
},
]
# Export types
InputType = IpToPortsEnricher.InputType
OutputType = IpToPortsEnricher.OutputTypeThe parameter schema defines the type, description, whether it's required, default values, and for select parameters, the available options. The vaultSecret type integrates with Flowsint's encrypted credential storage.
Using parameters in your enricher
Parameters are accessed through self.params in your scan method:
async def scan(self, data: List[InputType]) -> List[OutputType]:
"""
Scan IPs for open ports using configured parameters.
Args:
data: List of Ip objects to scan
Returns:
List of Port objects
"""
results: List[OutputType] = []
# Extract parameters
mode = self.params.get("mode", "passive")
port_range = self.params.get("port_range")
top_ports = self.params.get("top_ports")
api_key = self.get_secret("PDCP_API_KEY")
# Instantiate tool
naabu = NaabuTool()
for ip in data:
Logger.info(
self.sketch_id,
{"message": f"Scanning {ip.address} in {mode} mode"}
)
try:
# Launch tool with parameters
scan_results = naabu.launch(
target=ip.address,
mode=mode,
port_range=port_range,
top_ports=top_ports,
api_key=api_key
)
# Convert tool results to Port objects
for result in scan_results:
port = Port(
number=result.get("port"),
protocol=result.get("protocol", "tcp").upper(),
state="open",
service=result.get("service"),
banner=result.get("version")
)
results.append(port)
except Exception as e:
Logger.error(
self.sketch_id,
{"message": f"Error scanning {ip.address}: {e}"}
)
continue
return resultsHandling multiple output types
Some enrichers produce multiple types of results. You can define a custom return type using Pydantic:
from pydantic import BaseModel
from flowsint_enrichers.registry import flowsint_enricher
from flowsint_core.core.enricher_base import Enricher
from typing import List
from flowsint_types import Website, Email, Phone
class CrawlerResults(BaseModel):
"""Results from web crawler including multiple entity types."""
website: Website
emails: List[Email] = []
phones: List[Phone] = []
@flowsint_enricher
class WebsiteToCrawlerEnricher(Enricher):
"""Crawls websites to extract emails and phone numbers."""
# Define types as base types
InputType = Website
OutputType = CrawlerResults
async def scan(self, data: List[InputType]) -> List[OutputType]:
"""Crawl websites and extract contact information."""
from tools.network.reconcrawl import ReconCrawlTool
results: List[OutputType] = []
crawler_tool = ReconCrawlTool()
for website in data:
try:
# Launch crawler
crawl_data = crawler_tool.launch(website.url)
# Extract entities
emails = [Email(email=e) for e in crawl_data.get("emails", [])]
phones = [Phone(number=p) for p in crawl_data.get("phones", [])]
# Create result object
result = CrawlerResults(
website=website,
emails=emails,
phones=phones
)
results.append(result)
except Exception as e:
Logger.error(self.sketch_id, {"message": f"Crawl error: {e}"})
return results
def postprocess(self, results: List[OutputType], original_input: List[InputType]) -> List[OutputType]:
"""Create nodes for all discovered entities."""
for result in results:
# Create website node using Pydantic object
self.create_node(result.website)
# Create email nodes and relationships
for email in result.emails:
self.create_node(email)
self.create_relationship(result.website, email, "HAS_EMAIL")
# Create phone nodes and relationships
for phone in result.phones:
self.create_node(phone)
self.create_relationship(result.website, phone, "HAS_PHONE")
self.log_graph_message(
f"Processed {len(result.emails)} emails and {len(result.phones)} phones from {result.website.url}"
)
return results
# Export types
InputType = WebsiteToCrawlerEnricher.InputType
OutputType = WebsiteToCrawlerEnricher.OutputTypeRegistering your enricher
You don't need to register your enricher anywhere, adding the decorator @flowsint_enricher to your enricher class triggers the auto discovery.
from flowsint_enrichers.registry import flowsint_enricher
from flowsint_core.core.enricher_base import Enricher
@flowsint_enricher
class MyEnricher(Enricher):
...Testing your enricher
Creating tests helps ensure your enricher works correctly and makes debugging easier. Create a test file in flowsint-enrichers/tests/:
# tests/test_domain_to_ip.py
import pytest
from flowsint_enrichers.domain.to_ip import DomainToIpEnricher
from flowsint_types import Domain, Ip
@pytest.mark.asyncio
async def test_enricher_metadata():
"""Test enricher metadata is correctly defined."""
assert DomainToIpEnricher.name() == "domain_to_ip"
assert DomainToIpEnricher.category() == "Domain"
assert DomainToIpEnricher.key() == "domain"
@pytest.mark.asyncio
async def test_type_definitions():
"""Test InputType and OutputType are correctly defined."""
assert DomainToIpEnricher.InputType == Domain
assert DomainToIpEnricher.OutputType == Ip
@pytest.mark.asyncio
async def test_scan():
"""Test DNS resolution works."""
enricher = DomainToIpEnricher(sketch_id="test", scan_id="test")
input_data = [Domain(domain="example.com")]
results = await enricher.scan(input_data)
assert len(results) > 0
assert isinstance(results[0], Ip)
assert results[0].address # Should have an IP addressThese tests verify that your enricher's metadata is correct, type definitions are properly set, and the scan logic produces expected results. Input validation is handled automatically by Pydantic, so you don't need to test preprocessing separately.
Best practices
When creating enrichers, think carefully about error handling. Intelligence gathering involves many external systems that can fail in unpredictable ways. Your enricher should handle errors gracefully, log failures clearly, and continue processing remaining items rather than crashing entirely.
Always use the Logger utility for tracking progress and errors. The logger integrates with Flowsint's monitoring system and helps users understand what's happening during long-running enrichers. Log successful operations at the info level and errors at the error level.
Define InputType and OutputType as base types (e.g., Domain, not List[Domain]). The base class automatically handles the list wrapping and validation. This makes the type definitions cleaner and more intuitive.
Always export your types at the end of the file using:
InputType = YourEnricher.InputType
OutputType = YourEnricher.OutputTypeUse the simplified graph API by passing Pydantic objects directly to create_node() and create_relationship(). This eliminates boilerplate code and reduces errors. The methods automatically infer node types, primary keys, and properties from your Pydantic models.
Separate concerns between the two phases. Scanning should focus on gathering and processing data. Postprocessing should create graph structures. Input validation happens automatically through Pydantic, so you don't need to handle it manually.
Use type hints everywhere. They provide automatic validation, better IDE support, and serve as inline documentation. The InputType and OutputType class attributes should always be Pydantic types from the flowsint-types package.
When working with tools, remember that they return raw data structures. Your enricher is responsible for converting tool output into proper Flowsint types. This type conversion is typically done in the scan phase.
Document your enricher thoroughly. The class docstring, documentation method, and parameter descriptions all appear in the UI. Clear documentation helps users understand what your enricher does and how to configure it.
Handling API Rate Limits
When working with rate-limited APIs, add delays between requests:
async def scan(self, data: InputType) -> OutputType:
"""Scan with rate limiting to respect API limits."""
import asyncio
results = []
delay_seconds = 1 # Delay between requests
for item in data:
result = await self._query_api(item)
if result:
results.append(result)
# Respect rate limits
await asyncio.sleep(delay_seconds)
return resultsFallback data sources
Implement fallback logic when primary sources fail:
async def scan(self, data: InputType) -> OutputType:
"""Try multiple data sources with fallback logic."""
results = []
for domain in data:
# Try primary source
result = self._query_primary_source(domain)
if not result:
# Fall back to secondary source
Logger.info(
self.sketch_id,
{"message": f"Primary source failed for {domain}, trying fallback"}
)
result = self._query_fallback_source(domain)
if result:
results.append(result)
return resultsTroubleshooting
If your enricher doesn't appear in the API after registration, verify that you've imported it in registry.py, called EnricherRegistry.register(), and restarted the API server. The registry is loaded at startup, so changes require a restart.
For import errors, make sure all your dependencies are installed and the enricher file has no syntax errors. Check that you're importing from the correct packages.
If the scan method isn't finding any results, add logging statements to debug what's happening. Verify that tools are installed and accessible, API keys are valid if required, and input data is in the expected format.
When graph relationships aren't appearing, check that you're creating both nodes and relationships in the postprocess method. Verify that the relationship type name is correct and that you're passing the right node objects to create_relationship().
Next steps
Once you've created and registered your enricher, it becomes available through the API for users to run. Enrichers can be chained together into Flows where the output of one enricher feeds into the input of another. This enables complex intelligence gathering sequences.
Remember that enrichers are the heart of Flowsint's intelligence gathering capabilities. Well-designed enrichers that handle errors gracefully, log progress clearly, and create meaningful graph relationships make the entire platform more powerful and user-friendly.
If you create new enrichers that you think can help the community, it's highly appreciated that you open-source them. Help the community as much as it helps you !