Post

Syncing collections from MongoDB to PostgreSQL tables with Apache Flink CDC

Recently I’ve struggled a lot with preparing working example of Apache Flink CDC job synchronizing collections from MongoDB into PostgreSQL tables.

I’ve faced some obstacles, lacks in documentation and issues with versions incompatibility but finally I have something working.

As there is no much examples on the Internet, I’ve decided to share my results and a few comments to help you with potential issues.

The purpose and the solution

What I wanted to achieve was preparing a CDC job which will be constantly syncing changes from specified MongoDB collections into a separate PostgreSQL tables.

The below code is fulfilling its purpose, but please keep in mind, that I’m quite fresh with Apache Flink CDC, so it could not be the best possible way to do.

Sample code

The hearth of the job is basically a class with a main method:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
package software.hvb.apache.flink.cdc.example;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.cdc.connectors.base.options.StartupOptions;
import org.apache.flink.cdc.connectors.mongodb.source.MongoDBSource;
import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions.JdbcConnectionOptionsBuilder;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import software.hvb.apache.flink.cdc.example.extract.MongoOperation;
import software.hvb.apache.flink.cdc.example.extract.MongoOperationExtractor;

import java.sql.Timestamp;
import java.util.List;
import java.util.Map;


public class SyncMongoToPostgresJob {

    public static void main(String[] args) throws Exception {

        // helping configuration and mappings
        // from MongoDB collections to PostgreSQL tables

        var mongoDatabase = "mongo_database";
        var mongoCollections = List.of("collectionABC", "collectionDEF");

        var collectionToDocumentKeyPath = Map.of(
            mongoCollections.get(0), "_id",
            mongoCollections.get(1), "_id.value"
        );

        var collectionToTable = Map.of(
            mongoCollections.get(0), "table_abc_cdc",
            mongoCollections.get(1), "table_def_cdc"
        );

        var extractor = new MongoOperationExtractor();

        try (StreamExecutionEnvironment environment = 
                StreamExecutionEnvironment.getExecutionEnvironment()) {

            // initial stream settings
            var stream = environment
                .enableCheckpointing(5000)
                .fromSource(
                    createSource(mongoDatabase, mongoCollections),
                    WatermarkStrategy.noWatermarks(),
                    "Source-MongoDB"
                )
                .setParallelism(2)
                // processor which route documents based on collection name
                // or pass data without collection name to the default sink
                .process(new ProcessFunction<String, String>() {

                    @Override
                    public void processElement(
                        String value,
                        ProcessFunction<String, String>.Context ctx,
                        Collector<String> out
                    ) {
                        try {
                            // add tag with collection name to route the stream
                            ctx.output(
                                createTag(extractor.extractCollectionName(value)), 
                                value
                            );
                        } catch (Exception ex) {
                            // route other data to the default sink
                            out.collect(value);
                        }
                    }
                });

            mongoCollections.forEach(
                collection -> {
                	// setup to properly extract document key as it can be nested
                    String documentKeyPath = collectionToDocumentKeyPath.get(collection);
                    String postgresTable = collectionToTable.get(collection);
                    stream
                        // connect sink to stream based on collection name
                        .getSideOutput(createTag(collection))
                        .map(operation -> extractor.extract(operation, documentKeyPath))
                        .sinkTo(createSink(postgresTable))
                        .name("Sink-PostgreSQL: " + postgresTable)
                        .setParallelism(1);
                }
            );

            // set print to stdout as default sink
            // in theory should never happen
            stream.print();

            environment.execute(
                "Syncing collections from MongoDB to PostgreSQL tables: " 
                    + mongoCollections
            );
        }
    }

    private static MongoDBSource<String> createSource(
        String database, List<String> collections
    ) {
        return MongoDBSource.<String>builder()
            .hosts("host.docker.internal:27017")
            .username("root")
            .password("password")
            .connectionOptions("authSource=admin")
            .collectionList(
                collections.stream()
                    .map(collection -> database + "." + collection)
                    .toArray(String[]::new)
            )
            .deserializer(new JsonDebeziumDeserializationSchema())
            .startupOptions(StartupOptions.initial())
            .build();
    }

    private static OutputTag<String> createTag(String id) {
        return new OutputTag<>(id) {
            // empty by design
        };
    }

    private static Sink<MongoOperation> createSink(String tableName) {
        return JdbcSink.<MongoOperation>builder()
            .withQueryStatement(
                "INSERT INTO " + tableName +
                "  (document_key, operation_time, operation_type, document) " +
                "VALUES " + 
                "  (?, ?, ?, ?::JSONB) " +
                "ON CONFLICT (document_key, operation_time, operation_type) " +
                "   DO UPDATE SET document = EXCLUDED.document::JSONB",
                (statement, operation) -> {
                    statement.setString(1, operation.getDocumentKey());
                    statement.setTimestamp(2, new Timestamp(
                        operation.getTimestampMs() != null 
                            ? operation.getTimestampMs() : 0
                    ));
                    statement.setString(3, operation.getType());
                    statement.setObject(4, operation.getDocument());
                }
            )
            .buildAtLeastOnce(
                new JdbcConnectionOptionsBuilder()
                    .withUrl("jdbc:postgresql://host.docker.internal:5433/postgres")
                    .withDriverName("org.postgresql.Driver")
                    .withUsername("postgres")
                    .withPassword("mysecretpassword")
                    .build()
            );
    }
}

The second necessary class is extractor which extracts data from raw JSON string into Java object. This is rather a thing which depends on your specific preferences and approach. Treat it as a generic sample.

In my case I’ve decided to store every change in MongoDB as a separate entry event in PostgreSQL table.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
package software.hvb.apache.flink.cdc.example.extract;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.NullNode;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.io.Serializable;

import static org.apache.commons.lang3.ObjectUtils.firstNonNull;
import static org.apache.commons.lang3.ObjectUtils.isEmpty;

@Slf4j
public class MongoOperationExtractor implements Serializable {

    private final ObjectMapper objectMapper = new ObjectMapper();

    // Worked on Mongo 5.0.6 change events
    // https://www.mongodb.com/docs/v5.0/reference/change-events/
    public MongoOperation extract(String json, String documentKeyPath) throws IOException {

        log.info("Raw JSON to extract from: {}", json);

        JsonNode rootNode = objectMapper.readTree(json);

        JsonNode clusterTimeTextNode = rootNode.get("clusterTime");
        Long clusterTimeMs = (clusterTimeTextNode != NullNode.instance)
                ? objectMapper.readTree(clusterTimeTextNode.asText()).get("$timestamp").get("t").asLong() * 1000
                : null;
        Long eventProcessedTimeMs = rootNode.get("ts_ms").asLong();

        JsonNode documentTextNode = rootNode.get("fullDocument");
        String document = (documentTextNode != NullNode.instance)
                ? objectMapper.readTree(documentTextNode.asText()).toString()
                : null;

        JsonNode documentKeyNode = objectMapper.readTree(rootNode.get("documentKey").asText());
        for (String s : documentKeyPath.split("\\.")) {
            documentKeyNode = documentKeyNode.get(s);
        }
        // When operation is "delete" then there is no document provided.
        String documentKey = documentKeyNode != null ? documentKeyNode.asText() : null;

        return MongoOperation.builder()
                .database(rootNode.get("ns").get("db").asText())
                .collection(extractCollectionName(rootNode))
                .documentKey(documentKey)
                // In some case cluster time will be absent
                // so I'm taking event processed time in those cases.
                // Be careful with that as those time depends on system time. 
                .timestampMs(firstNonNull(clusterTimeMs, eventProcessedTimeMs))
                // The absence of cluster time means that this is the initial snapshot insert
                // instead of a normal insert and I'm marking it as "snapshot".
                .type(isEmpty(clusterTimeMs) ? "snapshot" : rootNode.get("operationType").asText())
                .document(document)
                .build();
    }

    public String extractCollectionName(String json) throws Exception {
        return extractCollectionName(objectMapper.readTree(json));
    }

    private static String extractCollectionName(JsonNode node) {
        return node.get("ns").get("coll").asText();
    }
}

Last but not least is docker compose file to prepare all necessary services.

I’m attaching it as I had some issues with connecting everything together and I hope it will be helpful for you.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
version: "2.2"
services:
  jobmanager:
    image: flink:1.19.1-java11
    command: jobmanager
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: jobmanager
        parallelism.default: 2
    ports:
      - "8181:8081"
  taskmanager:
    image: flink:1.19.1-java11
    depends_on:
      - jobmanager
    command: taskmanager
    scale: 1
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: jobmanager
        taskmanager.numberOfTaskSlots: 8
        parallelism.default: 2
  mongo:
    image: bitnami/mongodb:5.0.6-debian-10-r35
    environment:
      - MONGODB_ROOT_PASSWORD=password
      - MONGODB_USERNAME=mongo
      - MONGODB_PASSWORD=password
      - MONGODB_DATABASE=voicebot
      - MONGODB_REPLICA_SET_MODE=primary
      - MONGODB_REPLICA_SET_NAME=rs0
      - MONGODB_REPLICA_SET_KEY=replicaPassWd
      - MONGODB_ADVERTISED_HOSTNAME=localhost
    volumes:
      - ./mongo/data/mongodb:/bitnami/mongodb
      - ./mongo/rs-initiate.js:/docker-entrypoint-initdb.d/rs-initiate.js:ro
    networks:
      - internal-network
    ports:
      - "27017:27017"
  postgres:
    image: postgres:14.2
    environment:
      - POSTGRES_PASSWORD=mysecretpassword
    volumes:
      - ./postgres/data:/var/lib/postgresql/data
    ports:
      - "5433:5432"

You can find the entire sample project in a dedicated repository on our GitHub.

Troubleshooting

In addition to the aforementioned problems with connecting to running services, the most unobvious issue which I faced was taking correct Maven dependency version and incompatibilities between them. Additionally, even when I found all the necessary libraries compatible with each other, Apache Flink runtime was still showing errors with lacking of some classes which directed me again to incompatibility in Maven dependencies. Few hours later, after some tries and failes, lots of sweat and several curses I’ve realized that I have to have exactly the same version of Apache Flink runtime and Apache Flink library dependencies. In my case I’ve finished at version 1.19.1.

If you need help, just ask

I did not find much examples on this topic while I was working on this, so I understand you can have some troubles or questions. Do not hesitate to ask in comments bellow if you need any help.

This post is licensed under CC BY 4.0 by the author.