Adding Connectors
dbmazz uses a pluggable connector architecture. This guide explains how to add support for new databases.
Architecture Overview
Core Traits
Source Trait
#[async_trait]
pub trait Source: Send + Sync {
/// Connect to the source database
async fn connect(&mut self) -> Result<(), CoreError>;
/// Start streaming CDC events
async fn stream(&mut self, tx: Sender<CdcRecord>) -> Result<(), CoreError>;
/// Save checkpoint position
async fn checkpoint(&self, position: &SourcePosition) -> Result<(), CoreError>;
/// Get current position
fn current_position(&self) -> Option<SourcePosition>;
/// Disconnect and cleanup
async fn disconnect(&mut self) -> Result<(), CoreError>;
}
Sink Trait
#[async_trait]
pub trait Sink: Send + Sync {
/// Connect to the sink database
async fn connect(&mut self) -> Result<(), CoreError>;
/// Write a batch of records
async fn write_batch(&mut self, records: Vec<CdcRecord>) -> Result<(), CoreError>;
/// Create table if not exists
async fn create_table(&mut self, schema: &TableSchema) -> Result<(), CoreError>;
/// Get sink capabilities
fn capabilities(&self) -> SinkCapabilities;
/// Disconnect and cleanup
async fn disconnect(&mut self) -> Result<(), CoreError>;
}
CdcRecord
The normalized record format:
pub enum CdcRecord {
Insert {
schema: String,
table: String,
values: HashMap<String, Value>,
},
Update {
schema: String,
table: String,
values: HashMap<String, Value>,
key: HashMap<String, Value>,
},
Delete {
schema: String,
table: String,
key: HashMap<String, Value>,
},
SchemaChange {
schema: String,
table: String,
change_type: SchemaChangeType,
},
Begin { xid: u64 },
Commit { xid: u64, lsn: u64 },
Heartbeat { timestamp: i64 },
}
Adding a New Source
1. Create Directory Structure
src/connectors/sources/mysql/
├── mod.rs # Module exports
├── source.rs # MySQLSource implementation
├── config.rs # Configuration
├── decoder.rs # Binlog decoder
└── README.md # Documentation
2. Implement the Source
// src/connectors/sources/mysql/source.rs
use crate::core::{Source, CdcRecord, CoreError, SourcePosition};
use async_trait::async_trait;
use tokio::sync::mpsc::Sender;
pub struct MySQLSource {
config: MySQLConfig,
connection: Option<MySQLConnection>,
position: Option<SourcePosition>,
}
impl MySQLSource {
pub fn new(config: MySQLConfig) -> Self {
Self {
config,
connection: None,
position: None,
}
}
}
#[async_trait]
impl Source for MySQLSource {
async fn connect(&mut self) -> Result<(), CoreError> {
// Connect to MySQL
let conn = mysql_async::Conn::new(&self.config.url).await
.map_err(|e| CoreError::ConnectionError {
message: e.to_string()
})?;
self.connection = Some(conn);
Ok(())
}
async fn stream(&mut self, tx: Sender<CdcRecord>) -> Result<(), CoreError> {
// Read binlog and emit CdcRecords
let conn = self.connection.as_mut()
.ok_or(CoreError::NotConnected)?;
// Subscribe to binlog
let mut binlog_stream = conn.get_binlog_stream(&self.config).await?;
while let Some(event) = binlog_stream.next().await {
let record = self.decode_event(event)?;
tx.send(record).await
.map_err(|_| CoreError::ChannelClosed)?;
}
Ok(())
}
async fn checkpoint(&self, position: &SourcePosition) -> Result<(), CoreError> {
// Save GTID or binlog position
Ok(())
}
fn current_position(&self) -> Option<SourcePosition> {
self.position.clone()
}
async fn disconnect(&mut self) -> Result<(), CoreError> {
if let Some(conn) = self.connection.take() {
conn.disconnect().await?;
}
Ok(())
}
}
3. Register the Source
// src/connectors/sources/mod.rs
pub mod postgres;
pub mod mysql; // Add new module
use crate::core::{Source, CoreError};
use crate::config::SourceConfig;
pub fn create_source(config: &SourceConfig) -> Result<Box<dyn Source>, CoreError> {
match config.source_type.as_str() {
"postgres" => Ok(Box::new(postgres::PostgresSource::new(config.into()))),
"mysql" => Ok(Box::new(mysql::MySQLSource::new(config.into()))),
_ => Err(CoreError::UnsupportedSource {
source_type: config.source_type.clone()
}),
}
}
Adding a New Sink
1. Create Directory Structure
src/connectors/sinks/clickhouse/
├── mod.rs # Module exports
├── sink.rs # ClickHouseSink implementation
├── config.rs # Configuration
├── writer.rs # Batch writer
└── README.md # Documentation
2. Implement the Sink
// src/connectors/sinks/clickhouse/sink.rs
use crate::core::{Sink, CdcRecord, CoreError, SinkCapabilities, LoadingModel};
use async_trait::async_trait;
pub struct ClickHouseSink {
config: ClickHouseConfig,
client: Option<clickhouse::Client>,
}
impl ClickHouseSink {
pub fn new(config: ClickHouseConfig) -> Self {
Self {
config,
client: None,
}
}
}
#[async_trait]
impl Sink for ClickHouseSink {
async fn connect(&mut self) -> Result<(), CoreError> {
let client = clickhouse::Client::default()
.with_url(&self.config.url)
.with_user(&self.config.user)
.with_password(&self.config.password);
self.client = Some(client);
Ok(())
}
async fn write_batch(&mut self, records: Vec<CdcRecord>) -> Result<(), CoreError> {
let client = self.client.as_ref()
.ok_or(CoreError::NotConnected)?;
// Group records by table
let by_table = group_by_table(records);
for (table, table_records) in by_table {
// Convert to ClickHouse format and insert
let insert = client.insert(&table)?;
for record in table_records {
insert.write(&record.to_clickhouse_row())?;
}
insert.end().await?;
}
Ok(())
}
async fn create_table(&mut self, schema: &TableSchema) -> Result<(), CoreError> {
let ddl = self.generate_create_table(schema);
self.client.as_ref()
.ok_or(CoreError::NotConnected)?
.query(&ddl)
.execute()
.await?;
Ok(())
}
fn capabilities(&self) -> SinkCapabilities {
SinkCapabilities {
loading_model: LoadingModel::Streaming,
supports_upsert: true,
supports_delete: true,
supports_schema_evolution: true,
max_batch_size: 100_000,
}
}
async fn disconnect(&mut self) -> Result<(), CoreError> {
self.client = None;
Ok(())
}
}
3. Register the Sink
// src/connectors/sinks/mod.rs
pub mod starrocks;
pub mod clickhouse; // Add new module
use crate::core::{Sink, CoreError};
use crate::config::SinkConfig;
pub fn create_sink(config: &SinkConfig) -> Result<Box<dyn Sink>, CoreError> {
match config.sink_type.as_str() {
"starrocks" => Ok(Box::new(starrocks::StarRocksSink::new(config.into()))),
"clickhouse" => Ok(Box::new(clickhouse::ClickHouseSink::new(config.into()))),
_ => Err(CoreError::UnsupportedSink {
sink_type: config.sink_type.clone()
}),
}
}
Type Mapping
Each connector must map types between the source/sink and CdcRecord:
// src/connectors/sources/mysql/types.rs
pub fn mysql_to_value(mysql_type: &str, data: &[u8]) -> Value {
match mysql_type {
"TINYINT" | "SMALLINT" | "INT" => Value::Int32(parse_int(data)),
"BIGINT" => Value::Int64(parse_bigint(data)),
"FLOAT" => Value::Float32(parse_float(data)),
"DOUBLE" => Value::Float64(parse_double(data)),
"VARCHAR" | "TEXT" => Value::String(String::from_utf8_lossy(data).into()),
"DATETIME" | "TIMESTAMP" => Value::Timestamp(parse_datetime(data)),
"JSON" => Value::Json(parse_json(data)),
_ => Value::Bytes(data.to_vec()),
}
}
Testing Connectors
Unit Tests
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_mysql_connection() {
let config = MySQLConfig::from_url("mysql://root@localhost/test");
let mut source = MySQLSource::new(config);
assert!(source.connect().await.is_ok());
assert!(source.disconnect().await.is_ok());
}
#[tokio::test]
async fn test_record_conversion() {
let mysql_row = create_test_row();
let record = MySQLSource::row_to_record(&mysql_row);
assert!(matches!(record, CdcRecord::Insert { .. }));
}
}
Integration Tests
#[tokio::test]
#[ignore] // Requires running MySQL
async fn test_mysql_cdc_stream() {
let config = MySQLConfig::from_env();
let mut source = MySQLSource::new(config);
source.connect().await.unwrap();
let (tx, mut rx) = tokio::sync::mpsc::channel(100);
// Stream in background
tokio::spawn(async move {
source.stream(tx).await.unwrap();
});
// Insert test data
insert_test_row().await;
// Verify record received
let record = rx.recv().await.unwrap();
assert!(matches!(record, CdcRecord::Insert { .. }));
}
Documentation
Create comprehensive documentation:
<!-- src/connectors/sources/mysql/README.md -->
# MySQL Source Connector
## Requirements
- MySQL 5.7+ or 8.0+
- Binary logging enabled
- GTID mode recommended
## Configuration
| Variable | Required | Description |
|----------|----------|-------------|
| SOURCE_URL | Yes | MySQL connection URL |
| ... | ... | ... |
## Limitations
- ...
Contributing
See CONTRIBUTING_CONNECTORS.md for the complete guide.