Copyright | Abhinav Gupta 2015 |
---|---|
License | MIT |
Maintainer | mail@abhinavg.net |
Stability | experimental |
Portability | GHC |
Safe Haskell | None |
Language | Haskell2010 |
Kafka
Description
A library to interact with Apache Kafka 0.7.
- withConnection :: ByteString -> Int -> (Socket -> IO a) -> IO a
- produce :: Transport t => t -> [Produce] -> IO ()
- fetch :: Transport t => t -> [Fetch] -> IO (Response [FetchResponse])
- offsets :: Transport t => t -> Offsets -> IO (Response [Offset])
- data Produce = Produce {}
- data Fetch = Fetch {
- fetchTopic :: !Topic
- fetchPartition :: !Partition
- fetchOffset :: !Offset
- fetchSize :: !Size
- data FetchResponse = FetchResponse {
- fetchMessages :: [ByteString]
- fetchNewOffset :: !Offset
- data Offsets = Offsets {}
- data OffsetsTime
- newtype Topic = Topic ByteString
- newtype Offset = Offset Word64
- newtype Partition = Partition Word32
- newtype Size = Size Word32
- newtype Count = Count Word32
- data Error
- type Response a = Either Error a
- data Socket :: *
- class Transport t where
- send :: t -> ByteString -> IO ()
- recv :: t -> Int -> IO ByteString
Main interface
Requests to Kafka can be made using produce
, fetch
, and
offsets
. For produce
and fetch
, the functions automatically
decide whether the request needs to be a single Produce
/Fetch
request or a Multi*
request.
The request operations send requests and receive responses using any
type that is an instance of Transport
. withConnection
produces one
such object.
withConnection :: ByteString -> Int -> (Socket -> IO a) -> IO a Source
Open a connection, execute the given operation on it, and ensure it is closed afterwards even if an exception was thrown.
withConnection "localhost" 9092 $ \conn -> doStuff conn fail "something went wrong"
Throws an IOException
if we were unable to open the connection.
produce :: Transport t => t -> [Produce] -> IO () Source
Sends the given Produce
requests to Kafka.
If multiple requests are supplied, a MultiProduce
request is made.
withConnection
"localhost" 9092 $ \conn -> produce conn [Produce
(Topic
"my-topic") (Partition
0) ["foo"] , Produce "another-topic" 0 ["multiple", "messages"] ]
Note that string literals may be used in place of Topic
(with the
OverloadedStrings
GHC extension), and integer literals may be used in
place of Partition
.
fetch :: Transport t => t -> [Fetch] -> IO (Response [FetchResponse]) Source
Fetch
es messages from Kafka.
If multiple Fetch requests are supplied, a MultiFetch
request is made.
withConnection
"localhost" 9092 $ \conn -> do Right [FetchResponse
messages newOffset] <- fetch conn [Fetch
(Topic
"test-topic") (Partition
0) (Offset 42) 1024 ] {- Consume the messages here -} response <- fetch conn [Fetch
"test-topic" 0 newOffset 1024] {- ... -}
Returns a list of FetchResponse
s in the same order as the Fetch
requests. Each response contains the messages returned for the
corresponding request and the new offset at which the next request should
be made for that request to get the messages that follow.
If a response for a request contains no messages, the specified topic-partition pair has been exhausted.
Note that string literals may be used in place of Topic
(with the
OverloadedStrings
GHC extension), and integer literals may be used in
place of Offset
.
offsets :: Transport t => t -> Offsets -> IO (Response [Offset]) Source
Retrieve message offsets from Kafka.
withConnection
"localhost" 9092 $ \conn -> do Right [os] <- offsets conn (Offsets "topic" 0 OffsetsEarliest 1) fetch conn [Fetch
"topic" (Partition 0) os 10] >>= doSomething
Note that string literals may be used in place of Topic
(with the
OverloadedStrings
GHC extension), and integer literals may be used in
place of Count
.
Types
A request to send messages down a Kafka topic-partition pair.
Produce requests do not have a corresponding response. There is no way of
knowing in Kafka 0.7 if a message was successfully Produce
d.
Constructors
Produce | |
Fields
|
A request to fetch messages from a particular Kafka topic-partition pair.
FetchResponse
contains responses for this kind of request.
Constructors
Fetch | |
Fields
|
data FetchResponse Source
Result of a single Kafka Fetch
.
Constructors
FetchResponse | |
Fields
|
Instances
A request to retrieve offset information from Kafka.
The response for this kind of request is a list of Offset
s.
Constructors
Offsets | |
Fields
|
data OffsetsTime Source
Different times for which offsets may be retrieved using
offsets
.
Constructors
OffsetsLatest | Retrieve the latest offsets |
OffsetsEarliest | Retrieve the earliest offsets. |
OffsetsBefore !UTCTime | Retrieve offsets before the given time. Keep in mind that the response will not contain the precise offset that occurred around this time. It will return up to the specified count of offsets in descending, each being the first offset of every segment file for the specified partition with a modified time less than this time, and possibly a "high water mark" for the last segment of the partition (if it was modified before this time) which specifies the offset at which the next message to that partition will be written. |
Represents a Kafka topic.
This is an instance of IsString
so a literal string may be used to create
a Topic with the OverloadedStrings
extension.
Constructors
Topic ByteString |
Represents an Offset in Kafka.
This is an instance of Num
so a literal number may be used to create an
Offset.
Represents a Kafka topic partition.
This is an instance of Num
so a literal number may be used to create a
Partition.
Represents a size.
This is an instance of Num
so a literal number may be used to create a
Size.
Represents a Count.
This is an instance of Num
so a literal number may be used to create a
Size.
Other
Different errors returned by Kafka.
Constructors
UnknownError | Unknown error |
OffsetOutOfRangeError | Offset requested is invalid or no longer available on the server. |
InvalidMessageError | A message failed to match its checksum. |
WrongPartitionError | The requested partition doesn't exist. |
InvalidFetchSizeError | The maximum size requested for fetching is smaller than the message being fetched. |
type Response a = Either Error a Source
A response from Kafka can either be a failure or the value that was expected.
Transport
class Transport t where Source
Types that provide a means to send and receive bytes.
Methods
send :: t -> ByteString -> IO () Source
Send the given ByteString down the transport.
This must block until the request has been finished.
recv :: t -> Int -> IO ByteString Source
Read up to the given number of bytes from the stream.
The returned ByteString may be empty if the end of the stream was reached.