1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109
| const kafka = require('kafka-node'); const fetch = require('node-fetch');
const Consumer = kafka.Consumer; const client = new kafka.KafkaClient({kafkaHost: '10.11.xx.xxx:9093'}); const offset = new kafka.Offset(client);
const topic = 'testing'; const partition = 0;
offset.fetchLatestOffsets([topic], function (err, offsets) { if (err) return err;
let lastestOffset = offsets[topic][partition]; console.log(`lastest Offset: ${lastestOffset}`);
consumer = new Consumer( client, [ { topic: topic, partition: partition, offset: lastestOffset - 1 } ], { fromOffset: true, autoCommit: true } );
consumer.on('message', function (message) { console.log(message);
let data = { title: 'Kafka Consumer', content: `Kafka Topic: ${message.topic}`, description: [ `Offset: ${message.offset}`, `highWaterOffset: ${message.highWaterOffset}`, `timestamp: ${message.timestamp}` ], created: (new Date()).toString(), from: 'Apache Consumer', link: { text: 'Go Kafka', url: 'http://10.11.xx.xxx:3030' }, };
sendToExchange(data); });
consumer.on('error', function(err) { console.log(err);
let data = { title: 'Kafka Consumer Error', content: `Kafka Topic: ${err.message}`, created: (new Date()).toString(), from: 'Apache Consumer', type: 'urgent', link: { text: 'Go Kafka', url: 'http://10.11.xx.xxx:3030' }, };
sendToExchange(data); });
consumer.on('offsetOutOfRange', function(err) { console.log(err); }); });
function sendToExchange(value) { const url = "http://10.11.xx.xxx:x000/v1/graphql"; const exchange = "electron.notification.admin"; const routingKey = "dba"; const query = `mutation sendToExchange( $exchange: String!, $routingKey: String, $message: String!) { sendToExchange(exchange: $exchange, routingKey: $routingKey, message: $message) } `;
let message = JSON.stringify({ ...value, created: (new Date()).toString() });
fetch(url, { method: "POST", headers: { "Content-Type": "application/json", "Accept": "application/json" }, body: JSON.stringify({ query: query, variables: { exchange, routingKey, message } }) }) .then(r => r.json()) .then(data => { console.log(`data returned: ${JSON.stringify(data)}`); }) .catch(err => console.log(err.message)); }
|