Apache Kafka has become the backbone of modern event-driven architectures, enabling real-time streaming of data across distributed systems. While Kafka offers a wide variety of pre-built connectors through Kafka Connect, organizations often face the need to integrate with custom or niche systems that do not yet have an available connector.

One common scenario is the need to consume data from HTTP endpoints, whether from REST APIs, streaming APIs, or custom web services. While there are community connectors for HTTP, building a custom Kafka Connect HTTP Source Connector gives you complete flexibility in handling authentication, pagination, response parsing, and custom error handling.

This article walks you through the process of building, configuring, deploying, and using a custom HTTP Source Connector for Kafka Connect. We will also provide example code, configuration details, and deployment steps to ensure you can follow along.

Understanding Kafka Connect and Source Connectors

Kafka Connect is a framework built on top of Kafka to integrate external systems with Kafka topics. Connectors come in two types:

  • Source connectors: Pull data from an external system and write it into Kafka topics.

  • Sink connectors: Read data from Kafka topics and push it into external systems.

In our case, the goal is to implement a Source Connector that periodically polls an HTTP API endpoint and publishes the data into Kafka topics.

Design of a Custom HTTP Source Connector

A Kafka Connect Source Connector involves two main classes:

  1. Connector class (SourceConnector) – Defines configuration parameters and creates tasks.

  2. Task class (SourceTask) – Implements the logic to pull data from the external system and push it to Kafka.

Key features of our HTTP Source Connector:

  • Configurable HTTP endpoint URL

  • Support for GET requests

  • Configurable polling interval

  • Parsing JSON responses

  • Writing data into Kafka topics

Setting Up the Project

We will create a Maven project for our connector.

pom.xml basic structure:

<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd"
>
<modelVersion>4.0.0</modelVersion>
<groupId>com.example.kafka</groupId>
<artifactId>http-source-connector</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<dependencies>
<!– Kafka Connect API –>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-api</artifactId>
<version>3.5.1</version>
</dependency><!– JSON parsing –>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.15.2</version>
</dependency><!– HTTP client –>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.14</version>
</dependency>
</dependencies>
</project>

Implementing the Connector Class

The Connector class defines the configuration for our connector and creates tasks.

HttpSourceConnector.java:

package com.example.kafka.http;

import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.source.SourceConnector;

import java.util.*;

public class HttpSourceConnector extends SourceConnector {
public static final String TOPIC_CONFIG = “topic”;
public static final String URL_CONFIG = “http.url”;
public static final String POLL_INTERVAL_MS_CONFIG = “poll.interval.ms”;

private Map<String, String> configProps;

@Override
public String version() {
return “1.0”;
}

@Override
public void start(Map<String, String> props) {
this.configProps = props;
}

@Override
public Class<? extends Task> taskClass() {
return HttpSourceTask.class;
}

@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
List<Map<String, String>> configs = new ArrayList<>();
for (int i = 0; i < maxTasks; i++) {
configs.add(configProps);
}
return configs;
}

@Override
public void stop() {
// Cleanup resources if needed
}

@Override
public ConfigDef config() {
return new ConfigDef()
.define(TOPIC_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, “Kafka topic to publish data”)
.define(URL_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, “HTTP endpoint URL”)
.define(POLL_INTERVAL_MS_CONFIG, ConfigDef.Type.INT, 10000, ConfigDef.Importance.MEDIUM, “Polling interval in milliseconds”);
}
}

Implementing the Task Class

The Task class contains the logic for calling the HTTP API and producing records to Kafka.

HttpSourceTask.java:

package com.example.kafka.http;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.http.client.fluent.Request;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;

import java.util.*;

public class HttpSourceTask extends SourceTask {
private String topic;
private String url;
private int pollIntervalMs;
private long lastPoll = 0;

private final ObjectMapper mapper = new ObjectMapper();

@Override
public String version() {
return “1.0”;
}

@Override
public void start(Map<String, String> props) {
this.topic = props.get(HttpSourceConnector.TOPIC_CONFIG);
this.url = props.get(HttpSourceConnector.URL_CONFIG);
this.pollIntervalMs = Integer.parseInt(props.getOrDefault(HttpSourceConnector.POLL_INTERVAL_MS_CONFIG, “10000”));
}

@Override
public List<SourceRecord> poll() throws InterruptedException {
long now = System.currentTimeMillis();
if (now – lastPoll < pollIntervalMs) {
Thread.sleep(1000);
return Collections.emptyList();
}

try {
String response = Request.Get(url)
.connectTimeout(5000)
.socketTimeout(5000)
.execute()
.returnContent()
.asString();

JsonNode json = mapper.readTree(response);

Map<String, Object> sourcePartition = Collections.singletonMap(“url”, url);
Map<String, Object> sourceOffset = Collections.singletonMap(“timestamp”, System.currentTimeMillis());

SourceRecord record = new SourceRecord(
sourcePartition,
sourceOffset,
topic,
Schema.STRING_SCHEMA,
json.toString()
);

lastPoll = now;
return Collections.singletonList(record);

} catch (Exception e) {
e.printStackTrace();
return Collections.emptyList();
}
}

@Override
public void stop() {
// Nothing to clean up
}
}

Packaging the Connector

After implementing the classes, package the connector into a JAR:

mvn clean package

This will create a JAR file inside the target/ directory, e.g., http-source-connector-1.0-SNAPSHOT.jar.

Deploying the Connector

  1. Copy the JAR into Kafka Connect’s plugin directory:

cp target/http-source-connector-1.0-SNAPSHOT.jar $KAFKA_CONNECT_HOME/plugins/http-source-connector/
  1. Restart Kafka Connect to load the new plugin.

Connector Configuration

Create a JSON configuration file, e.g., http-source-config.json:

{
"name": "http-source-connector",
"config": {
"connector.class": "com.example.kafka.http.HttpSourceConnector",
"tasks.max": "1",
"topic": "http-topic",
"http.url": "https://jsonplaceholder.typicode.com/posts",
"poll.interval.ms": "10000"
}
}

Deploy the connector by posting this config to the Kafka Connect REST API:

curl -X POST -H "Content-Type: application/json" \
--data @http-source-config.json \
http://localhost:8083/connectors

Verifying Data Flow

Check that data is being published into the configured topic:

kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic http-topic \
--from-beginning

You should see JSON strings from the HTTP API appearing as Kafka messages.

Enhancements and Best Practices

  • Error Handling: Add retries, backoff strategies, and dead-letter topic support.

  • Authentication: Support for Basic Auth, Bearer Tokens, or OAuth.

  • Data Transformation: Use Kafka Connect’s Single Message Transforms (SMTs) for shaping data before it reaches Kafka.

  • Pagination Handling: Extend the task logic to handle paginated APIs.

  • Schema Support: Instead of publishing raw JSON strings, map responses into structured schemas.

Conclusion

Implementing a custom Kafka Connect HTTP Source Connector provides full control over how data is ingested from HTTP endpoints into Kafka. While there are existing connectors that can handle generic HTTP ingestion, custom connectors allow you to:

  • Tailor polling strategies to your system’s requirements.

  • Handle specific authentication mechanisms.

  • Enforce custom parsing, filtering, or enrichment.

  • Ensure compatibility with internal APIs that may not conform to public standards.

In this guide, we built a connector from scratch, covering configuration, implementation, deployment, and usage. By following the steps outlined above, you can now integrate any HTTP-based data source into Kafka with ease.

As organizations continue to evolve towards real-time architectures, being able to bridge legacy HTTP APIs with Kafka provides a significant advantage. Once data lands in Kafka, it can be consumed by downstream applications, analytics pipelines, or real-time dashboards, unlocking powerful new capabilities.

Ultimately, the flexibility of Kafka Connect combined with the extensibility of custom connectors ensures that no system is out of reach. With a carefully designed connector, you can reliably integrate your APIs into Kafka and harness the full power of streaming data.