欧美亚洲中文,在线国自产视频,欧洲一区在线观看视频,亚洲综合中文字幕在线观看

      1. <dfn id="rfwes"></dfn>
          <object id="rfwes"></object>
        1. 站長(zhǎng)資訊網(wǎng)
          最全最豐富的資訊網(wǎng)站

          詳解PHP實(shí)現(xiàn)生產(chǎn)者與消費(fèi)者(Kafka應(yīng)用)

          本篇文章給大家介紹PHP實(shí)現(xiàn)生產(chǎn)者與消費(fèi)者,希望對(duì)需要的朋友有所幫助!

          前言

          PHP中使用Kafka需要RdKafka擴(kuò)展,而RdKafka依賴于librdkafka,所以這兩個(gè)我們都需要安裝,具體安裝方法自行百度,本篇不做說(shuō)明了。

          生產(chǎn)者(測(cè)試)

          創(chuàng)建消費(fèi)者需要步驟:

          • 生產(chǎn)者配置參數(shù)
          • 創(chuàng)建生產(chǎn)者實(shí)例
          • 創(chuàng)建主題實(shí)例(依賴生產(chǎn)者)
          • 生產(chǎn)主題消息
          • 推送消息

          具體代碼如下:

                  $conf = new RdKafkaConf();         // 綁定服務(wù)節(jié)點(diǎn)         $conf->set('metadata.broker.list', '127.0.0.1:32772');          // 創(chuàng)建生產(chǎn)者         $kafka = new RdKafkaProducer($conf);          // 創(chuàng)建主題實(shí)例         $topic = $kafka->newTopic('p1r1');         // 生產(chǎn)主題數(shù)據(jù),此時(shí)消息在緩沖區(qū)中,并沒(méi)有真正被推送         $topic->produce(RD_KAFKA_PARTITION_UA, 0, 'Message');         // 阻塞時(shí)間(毫秒), 0為非阻塞         $kafka->poll(0);           // 推送消息,如果不調(diào)用此函數(shù),消息不會(huì)被發(fā)送且會(huì)丟失         $result = $kafka->flush(5000);          if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) {             throw new RuntimeException('Was unable to flush, messages might be lost!');         }

          消費(fèi)者

          創(chuàng)建一個(gè)消費(fèi)者需要幾個(gè)步驟:

          • 消費(fèi)者配置參數(shù)
          • 應(yīng)用配置參數(shù)創(chuàng)建消費(fèi)者實(shí)例
          • 訂閱對(duì)應(yīng)主題
          • 拉取數(shù)據(jù)
          • 提交位移

          具體代碼如下:

                  $conf = new RdKafkaConf();         // 綁定消費(fèi)者組         $conf->set('group.id', 'ceshi');         // 綁定服務(wù)節(jié)點(diǎn),多個(gè)用,分隔         $conf->set('metadata.broker.list', '127.0.0.1:32787');         // 設(shè)置自動(dòng)提交為false         $conf->set('enable.auto.commit', 'false');         // 設(shè)置當(dāng)前消費(fèi)者拉取數(shù)據(jù)時(shí)的偏移量, 可選參數(shù):         // earliest: 如果消費(fèi)者組是新創(chuàng)建的,從頭開(kāi)始消費(fèi),否則從消費(fèi)者組當(dāng)前消費(fèi)位移開(kāi)始。         // latest:如果消費(fèi)者組是新創(chuàng)建的,從最新偏移量開(kāi)始,否則從消費(fèi)者組當(dāng)前消費(fèi)位移開(kāi)始。         $conf->set('auto.offset.reset', 'earliest');          // 創(chuàng)建消費(fèi)者實(shí)例         $consumer = new RdKafkaKafkaConsumer($conf);         // 消費(fèi)者訂閱主題,數(shù)組形式         $consumer->subscribe(['topic1','topic2']);         while (true) {             // 消費(fèi)數(shù)據(jù),阻塞5秒(5秒內(nèi)有數(shù)據(jù)就消費(fèi),沒(méi)有數(shù)據(jù)等待5秒進(jìn)入下一輪循環(huán))             $message = $consumer->consume(5000);             switch ($message->err) {                 case RD_KAFKA_RESP_ERR_NO_ERROR:                     // 業(yè)務(wù)邏輯                     var_dump($message);                      // 提交位移                     $consumer->commit($message);                     break;                 case RD_KAFKA_RESP_ERR__PARTITION_EOF:                     echo "No more messages; will wait for moren";                     break;                 case RD_KAFKA_RESP_ERR__TIMED_OUT:                     echo "Timed outn";                     break;                 default:                     throw new Exception($message->errstr(), $message->err);                     break;             }         }         // 關(guān)閉消費(fèi)者(一般用在腳本中,不需要關(guān)閉)         $conumser->close();

          只消費(fèi)指定分區(qū)中的數(shù)據(jù):

              // 對(duì)消費(fèi)者指定分區(qū),注意此方式不能與subscribe一同使用     $consumer->assign([         new RdKafkaTopicPartition("topic", 0),         new RdKafkaTopicPartition("topic", 1),     ]);

          贊(0)
          分享到: 更多 (0)
          網(wǎng)站地圖   滬ICP備18035694號(hào)-2    滬公網(wǎng)安備31011702889846號(hào)