// ProduceRequest issues records to be created to Kafka. // // Kafka 0.10.0 (v2) changed Records from MessageSet v0 to MessageSet v1. // Kafka 0.11.0 (v3) again changed Records to RecordBatch. // // Note that the special client ID "__admin_client" will allow you to produce // records to internal topics. This is generally recommended if you want to // break your Kafka cluster. ProduceRequest => key 0, max version 9, flexible v9+ // TransactionID is the transaction ID to use for this request, allowing for // exactly once semantics. TransactionID: nullable-string // v3+ // Acks specifies the number of acks that the partition leaders must receive // from in sync replicas before considering a record batch fully written. // // Valid values are -1, 0, or 1 corresponding to all, none, or the leader only. // // Note that if no acks are requested, Kafka will close the connection // if any topic or partition errors to trigger a client metadata refresh. Acks: int16 TimeoutMillis // Topics is an array of topics to send record batches to. Topics: [=>] // Topic is a topic to send record batches to. Topic: string // Partitions is an array of partitions to send record batches to. Partitions: [=>] // Partition is a partition to send a record batch to. Partition: int32 // Records is a batch of records to write to a topic's partition. // // For Kafka pre 0.11.0, the contents of the byte array is a serialized // message set. At or after 0.11.0, the contents of the byte array is a // serialized RecordBatch. Records: nullable-bytes // ProduceResponse is returned from a ProduceRequest. ProduceResponse => // Topics is an array of responses for the topic's that batches were sent // to. Topics: [=>] // Topic is the topic this response pertains to. Topic: string // Partitions is an array of responses for the partition's that // batches were sent to. Partitions: [=>] // Partition is the partition this response pertains to. Partition: int32 // ErrorCode is any error for a topic/partition in the request. // There are many error codes for produce requests. // // TRANSACTIONAL_ID_AUTHORIZATION_FAILED is returned for all topics and // partitions if the request had a transactional ID but the client // is not authorized for transactions. // // CLUSTER_AUTHORIZATION_FAILED is returned for all topics and partitions // if the request was idempotent but the client is not authorized // for idempotent requests. // // TOPIC_AUTHORIZATION_FAILED is returned for all topics the client // is not authorized to talk to. // // INVALID_REQUIRED_ACKS is returned if the request contained an invalid // number for "acks". // // CORRUPT_MESSAGE is returned for many reasons, generally related to // problems with messages (invalid magic, size mismatch, etc.). // // MESSAGE_TOO_LARGE is returned if a record batch is larger than the // broker's configured max.message.size. // // RECORD_LIST_TOO_LARGE is returned if the record batch is larger than // the broker's segment.bytes. // // INVALID_TIMESTAMP is returned if the record batch uses LogAppendTime // or if the timestamp delta from when the broker receives the message // is more than the broker's log.message.timestamp.difference.max.ms. // // UNSUPPORTED_FOR_MESSAGE_FORMAT is returned if using a Kafka v2 message // format (i.e. RecordBatch) feature (idempotence) while sending v1 // messages (i.e. a MessageSet). // // KAFKA_STORAGE_ERROR is returned if the log directory for a partition // is offline. // // NOT_ENOUGH_REPLICAS is returned if all acks are required, but there // are not enough in sync replicas yet. // // NOT_ENOUGH_REPLICAS_AFTER_APPEND is returned on old Kafka versions // (pre 0.11.0.0) when a message was written to disk and then Kafka // noticed not enough replicas existed to replicate the message. // // DUPLICATE_SEQUENCE_NUMBER is returned for Kafka <1.1.0 when a // sequence number is detected as a duplicate. After, out of order // is returned. // // UNKNOWN_TOPIC_OR_PARTITION is returned if the topic or partition // is unknown. // // NOT_LEADER_FOR_PARTITION is returned if the broker is not a leader // for this partition. This means that the client has stale metadata. // // INVALID_PRODUCER_EPOCH is returned if the produce request was // attempted with an old epoch. Either there is a newer producer using // the same transaction ID, or the transaction ID used has expired. // // UNKNOWN_PRODUCER_ID, added in Kafka 1.0.0 (message format v5+) is // returned if the producer used an ID that Kafka does not know about or // if the request has a larger sequence number than Kafka expects. The // LogStartOffset must be checked in this case. If the offset is greater // than the last acknowledged offset, then no data loss has occurred; the // client just sent data so long ago that Kafka rotated the partition out // of existence and no longer knows of this producer ID. In this case, // reset your sequence numbers to 0. If the log start offset is equal to // or less than what the client sent prior, then data loss has occurred. // See KAFKA-5793 for more details. NOTE: Unfortunately, even UNKNOWN_PRODUCER_ID // is unsafe to handle, so this error should likely be treated the same // as OUT_OF_ORDER_SEQUENCE_NUMER. See KIP-360 for more details. // // OUT_OF_ORDER_SEQUENCE_NUMBER is sent if the batch's FirstSequence was // not what it should be (the last FirstSequence, plus the number of // records in the last batch, plus one). After 1.0.0, this generally // means data loss. Before, there could be confusion on if the broker // actually rotated the partition out of existence (this is why // UNKNOWN_PRODUCER_ID was introduced). ErrorCode: int16 // BaseOffset is the offset that the records in the produce request began // at in the partition. BaseOffset: int64 // LogAppendTime is the millisecond that records were appended to the // partition inside Kafka. This is only not -1 if records were written // with the log append time flag (which producers cannot do). LogAppendTime: int64(-1) // v2+ // LogStartOffset, introduced in Kafka 1.0.0, can be used to see if an // UNKNOWN_PRODUCER_ID means Kafka rotated records containing the used // producer ID out of existence, or if Kafka lost data. LogStartOffset: int64(-1) // v5+ // ErrorRecords are indices of individual records that caused a batch // to error. This was added for KIP-467. ErrorRecords: [=>] // v8+ // RelativeOffset is the offset of the record that caused problems. RelativeOffset: int32 // ErrorMessage is the error of this record. ErrorMessage: nullable-string // ErrorMessage is the global error message of of what caused this batch // to error. ErrorMessage: nullable-string // v8+ ThrottleMillis(6) // v1+