kafka-client-0.7.0.0: Low-level Haskell client library for Apache Kafka 0.7.

CopyrightAbhinav Gupta 2015
LicenseMIT
Maintainermail@abhinavg.net
Stabilityexperimental
PortabilityGHC
Safe HaskellNone
LanguageHaskell2010

Kafka

Contents

Description

A library to interact with Apache Kafka 0.7.

Synopsis

Main interface

Requests to Kafka can be made using produce, fetch, and offsets. For produce and fetch, the functions automatically decide whether the request needs to be a single Produce/Fetch request or a Multi* request.

The request operations send requests and receive responses using any type that is an instance of Transport. withConnection produces one such object.

withConnection :: ByteString -> Int -> (Socket -> IO a) -> IO a Source

Open a connection, execute the given operation on it, and ensure it is closed afterwards even if an exception was thrown.

withConnection "localhost" 9092 $ \conn ->
   doStuff conn
   fail "something went wrong"

Throws an IOException if we were unable to open the connection.

produce :: Transport t => t -> [Produce] -> IO () Source

Sends the given Produce requests to Kafka.

If multiple requests are supplied, a MultiProduce request is made.

withConnection "localhost" 9092 $ \conn ->
  produce conn [
     Produce (Topic "my-topic") (Partition 0) ["foo"]
   , Produce "another-topic" 0
               ["multiple", "messages"]
   ]

Note that string literals may be used in place of Topic (with the OverloadedStrings GHC extension), and integer literals may be used in place of Partition.

fetch :: Transport t => t -> [Fetch] -> IO (Response [FetchResponse]) Source

Fetches messages from Kafka.

If multiple Fetch requests are supplied, a MultiFetch request is made.

withConnection "localhost" 9092 $ \conn -> do
  Right [FetchResponse messages newOffset] <- fetch conn [
      Fetch (Topic "test-topic") (Partition 0) (Offset 42) 1024
    ]
  {- Consume the messages here -}
  response <- fetch conn [Fetch "test-topic" 0 newOffset 1024]
  {- ... -}

Returns a list of FetchResponses in the same order as the Fetch requests. Each response contains the messages returned for the corresponding request and the new offset at which the next request should be made for that request to get the messages that follow.

If a response for a request contains no messages, the specified topic-partition pair has been exhausted.

Note that string literals may be used in place of Topic (with the OverloadedStrings GHC extension), and integer literals may be used in place of Offset.

offsets :: Transport t => t -> Offsets -> IO (Response [Offset]) Source

Retrieve message offsets from Kafka.

withConnection "localhost" 9092 $ \conn -> do
  Right [os] <- offsets conn (Offsets "topic" 0 OffsetsEarliest 1)
  fetch conn [Fetch "topic" (Partition 0) os 10] >>= doSomething

Note that string literals may be used in place of Topic (with the OverloadedStrings GHC extension), and integer literals may be used in place of Count.

Types

data Produce Source

A request to send messages down a Kafka topic-partition pair.

Produce requests do not have a corresponding response. There is no way of knowing in Kafka 0.7 if a message was successfully Produced.

Constructors

Produce 

Fields

produceTopic :: !Topic

Kafka topic to which the messages will be sent.

producePartition :: !Partition

Partition of the topic.

produceMessages :: [ByteString]

List of message payloads.

For those concerned with low-leveld details: These messages will be compressed using Snappy compression.

data Fetch Source

A request to fetch messages from a particular Kafka topic-partition pair.

FetchResponse contains responses for this kind of request.

Constructors

Fetch 

Fields

fetchTopic :: !Topic

Kafka topic from which messages will be fetched.

fetchPartition :: !Partition

Partition of the topic.

fetchOffset :: !Offset

Offset at which the fetch will start.

Kafka offloads the responsiblity of knowing this to the client. That means that if an offset is specified here that is not a real message start, Kafka will spit out garbage.

Use offsets to find valid offsets.

fetchSize :: !Size

Maximum size of the returned messages.

Note, this is not the number of messages. This is the maximum combined size of the returned compressed messages.

Instances

data FetchResponse Source

Result of a single Kafka Fetch.

Constructors

FetchResponse 

Fields

fetchMessages :: [ByteString]

List of messages returned in the response for the Fetch request.

fetchNewOffset :: !Offset

New offset at which the next Fetch request should start reading in the same topic and partition to access the messages that follow the messages returned in this response.

data Offsets Source

A request to retrieve offset information from Kafka.

The response for this kind of request is a list of Offsets.

Constructors

Offsets 

Fields

offsetsTopic :: !Topic

Kafka topic from which offsets will be retrieved.

offsetsPartition :: !Partition

Partition of the topic.

offsetsTime :: !OffsetsTime

Time around which offsets will be retrieved.

If you provide a time for this, keep in mind that the response will not contain the precise offset that occurred around that time. It will return up to offsetsCount offsets in descending, each being the first offset of every segment file for the specified partition with a modified time less than the specified time, and possibly a "high water mark" for the last segment of the partition (if it was modified before the specified time) which specifies the offset at which the next message to that partition will be written.

offsetsCount :: !Count

Maximum number of offsets that will be retrieved.

data OffsetsTime Source

Different times for which offsets may be retrieved using offsets.

Constructors

OffsetsLatest

Retrieve the latest offsets

OffsetsEarliest

Retrieve the earliest offsets.

OffsetsBefore !UTCTime

Retrieve offsets before the given time.

Keep in mind that the response will not contain the precise offset that occurred around this time. It will return up to the specified count of offsets in descending, each being the first offset of every segment file for the specified partition with a modified time less than this time, and possibly a "high water mark" for the last segment of the partition (if it was modified before this time) which specifies the offset at which the next message to that partition will be written.

newtype Topic Source

Represents a Kafka topic.

This is an instance of IsString so a literal string may be used to create a Topic with the OverloadedStrings extension.

Constructors

Topic ByteString 

newtype Offset Source

Represents an Offset in Kafka.

This is an instance of Num so a literal number may be used to create an Offset.

Constructors

Offset Word64 

newtype Partition Source

Represents a Kafka topic partition.

This is an instance of Num so a literal number may be used to create a Partition.

Constructors

Partition Word32 

newtype Size Source

Represents a size.

This is an instance of Num so a literal number may be used to create a Size.

Constructors

Size Word32 

newtype Count Source

Represents a Count.

This is an instance of Num so a literal number may be used to create a Size.

Constructors

Count Word32 

Other

data Error Source

Different errors returned by Kafka.

Constructors

UnknownError

Unknown error

OffsetOutOfRangeError

Offset requested is invalid or no longer available on the server.

InvalidMessageError

A message failed to match its checksum.

WrongPartitionError

The requested partition doesn't exist.

InvalidFetchSizeError

The maximum size requested for fetching is smaller than the message being fetched.

type Response a = Either Error a Source

A response from Kafka can either be a failure or the value that was expected.

Transport

class Transport t where Source

Types that provide a means to send and receive bytes.

Methods

send :: t -> ByteString -> IO () Source

Send the given ByteString down the transport.

This must block until the request has been finished.

recv :: t -> Int -> IO ByteString Source

Read up to the given number of bytes from the stream.

The returned ByteString may be empty if the end of the stream was reached.

Instances