+669 8259 1133
+669 8259 1133

Stream Processing ด้วย Rx

25 April 2021

อะไรคือ Rx?

Rx หรือ Reactive Extension นั้น เป็นการ Implement แนวคิด Reactive Programming โดยทีมของไมโครซอฟท์เพื่อใช้งานภายใน ก่อนจะ Release เป็น Library สำหรับ C# เมื่อราวๆ 10 ปีก่อน และพัฒนาขึ้นสำหรับภาษาอื่นๆ ตามมา (คิดว่าน่าจะเป็นโปรเจค Open Source แรกๆ ของ Microsoft เลยทีเดียว)

เรื่อง Reactive Programming นี้ ผู้เขียนได้ยินครั้งแรกตอนที่ไปร่วมงาน MVP Global Summit และจำได้ขึ้นใจว่า ในห้องที่ไปนั่งฟังคนคิดเขา Present ให้ฟัง ด้วยเดโมที่เป็นภาษา C# และเปรียบเทียบตัวอย่างเป็นการเขียนจับ Event Mouse Move แบบ Reactive แทนการใช้ Event Handler แบบปกติ เลยทำให้หลายคนในห้อง รวมถึงผู้เขียนด้วย สงสัยว่า ทำไมถึงมีการคิดสร้างตัว Reactive Extension นี้ขึ้นมา เพราะว่า ภาษา C# เป็นภาษาที่มี Function Pointer (Delegate) อยู่แล้ว คือเราสามารถส่งตัวแปร ที่เก็บค่าของ Function ให้อีก Function เรียกใช้งานได้ และยังมีระบบ Event อีกด้วย คือ เราสามารถ "ลงทะเบียน" Function Pointer หลายๆ ตัวไว้ และให้ทุก Function ที่ลงทะเบียนไว้ สามารถถูกเรียกใช้งานได้ตามลำดับที่ลงทะเบียน สำหรับภาษาที่ไม่มีระบบ Event เช่น JavaScript, C++ เนี่ย การมี Rx ดูเข้าใจมากกว่า

(หมายเหตุ: DOM Event ที่เรา้ใชใน JavaScript จะต่างจาก Event ของ C# ตรงที่เราสามารถ Hijack Event ที่เคยถูกเซ็ตค่าไว้แล้วได้)

จนเมื่อตอนนี้ 10 ปี ผ่านมา เรามีการพูดถึงเรื่อง Event Driven Architecture และ Stream Processing กันมาก ผู้เขียนจึงเพิ่งจะเก็ทว่า ทำไมมันถึงถูกสร้างขึ้นมา เลยขออาสามาเล่าสู่กันฟัง

จุดเริ่มต้นของ Event Driven Architecture

ในช่วงประมาณปี 2003 นั้น การทำระบบในลักษณะ Service Oriented Architecture (SOA) และ XML Web Service กำลังเริ่มได้รับความนิยม โดย Microsoft เองก็คือผู้ที่คิดค้น SOAP Webservice ขึ้นมาเองนั่นแหละ และ Microsoft ก็ผลักดันการใช้ Web Service นี้มาก ถึงขั้นที่เราสามารถคลิกขวา Add Service Reference แล้วตัว Visual Studio 2003 สามารถ Generate Client สำหรับเรียก Web Service ขึ้นมาได้ พร้อมมี Class รองรับเสร็จสรรพ ก่อนที่จะมี OpenAPI เกิดขึ้นมาเสียอีก

ปัญหาที่เกิดตามมาก็คือ เมื่อเรามีระบบแยกกันเป็นชิ้นๆ จึงเริ่มเกิด Dependency กันระหว่าง Service ตัวอย่างเช่น Service A ไปเรียกใช้ Service B และ C ดังนั้น ถ้าหาก Service B และ C ล่ม ก็จะทำให้ Service A เหมือนล่มไปด้วย เพราะว่าไม่สามารถ Response คำตอบ กลับไปยังผู้ที่มาเรียกใช้ Service A ได้

เมื่อเกิดปัญหานี้ ธรรมชาติของคนคิด Architecture ทุกคน ก็จะต้องทำการ "Abstract" คือ แยกการคุยกันไปกันมา ของ A, B และ C นี้ออกจากกัน

แนวคิดเริ่มต้น จะเป็นระบบในลักษณะ Service Bus หรือ Message Queue มาก่อน เราเรียกกันว่า ระบบ ESB หรือ Enterprise Service Bus หลักการทำงานก็คือ แทนที่ระบบ A, B, C จะคุยกันเองโดยตรง (คำว่าโดยตรงคือหมายถึง A เปิด TCP Socket ไปหา B) ก็จะเป็นการส่งข้อความจาก A ถึง B และ A ถึง C ลงไปใน ESB แทน ส่วนตอนที่ B และ C จะตอบกลับหา A ก็จะเป็นการส่งข้อความ วางลงไปใน ESB เช่นกัน

แนวคิดของ ESB นี้ ได้เปลี่ยนลักษณะการทำงาน จากแบบ Synchronous คือ A จะทำงานเสร็จได้ ต้องรอ B ตอบ ให้กลายเป็น Asynchronous คือ A แค่บอกลอยๆ ออกมา ในสิ่งที่จะถามหรือสั่ง B ถ้า B ว่างตอบเมื่อไหร่ A จึงจะสามารถไปตอบคนที่มาสั่ง A ได้อีกทีหนึ่ง

ความท้าทายใหม่ของการทำงานแบบ Asynchronous ก็คือ เราจำเป็นจะต้องมีทางที่จะให้ Service "ทัก" หากันได้ เปรียบเทียบก็เหมือน Service A จะต้อง Tag หา Service B ได้ เมื่อเวลาต้องการให้ B รับรู้ว่า A ต้องการจะให้ B ทำงานให้ ส่วนพอ B ทำงานเสร็จแล้ว ก็จะ Tag กลับไปหา A ว่า เสร็จแล้วนะ ให้มาหยิบงานไปใช้ โดยระบบที่ทำให้ระบบของเรา สามารถ "ทัก" กันได้ง่ายๆ แบบนี้ ก็คือ Apache Kafka นั่นเอง

หลักการง่ายๆ ของ Kafka มีแค่สี่ส่วน คือ

  • Topic: คือเรื่องที่หลาย Service สนใจร่วมกัน
  • Message: คือ ข้อความหรือสิ่งที่จะต้องการสื่อสารไปยัง Topic ไม่จำกัดว่าอะไรจะอยู่ใน Message และความยาวของ Message จะยาวเท่าไหนก็ได้ (แล้วแต่ตั้งค่า)
  • Producer: คือผู้ที่ส่ง Message
  • Consumer: คือผู้ที่รับ Message

ในการทำ Architecture ลักษณะ Event Driven เราก็จะตั้ง Topic เป็น "เหตุการณ์" ที่จะเกิดขึ้นในระบบ ถ้ายกตัวอย่างเป็นร้านค้าออนไลน์ เราอาจสามารถตั้ง Topic ได้ว่า

  • NewOrder: หมายถึง มี Order ใหม่ ส่งเข้ามาจากหน้าเว็บ
  • OrderPaid: หมายถึง มี Order ที่ได้รับการชำระเงิน
  • OrderPacked: หมายถึง มี Order ที่ได้รับการ Pack แล้ว

โดยลำดับการทำงานก็อาจจะเป็นดังนี้

  • เมื่อมี Order ใหม่จากหน้าเว็บ จะมี Message ใหม่ ใน Topic NewOrder เป็นข้อมูลของ Order
  • เมื่อลูกค้าจ่ายเงินสำเร็จ จะมี Message ใหม่ ใน Topic OrderPaid เป็นข้อมูลของ Order ที่อาจจะซ้ำกับ Order ที่อยู่ใน NewOrder ก็ได้
  • ระบบ Fulfillment (ระบบบริหารการหยิบสินค้า) จะเป็น Consumer ของ Topic OrderPaid อยู่ เมื่อเห็นว่ามี Order ที่เพิ่งได้รับการชำระเงิน ก็จะทำการ Request ขอ Tracking Number จากพิมพ์ใบหยิบของ (Packing List) ออกมาทางเครื่องพิมพ์ พนักงานหยิบกระดาษ Packing List เดินไปหยิบของ แพคลงกล่อง ติดเทป แล้วกดหน้าจอ Touch Screen ที่ Station ของตัวเองว่า Pack เรียบร้อย

    ซึ่งระบบ Fulfillment จะทำการวาง Message ใหม่ลงใน Topic OrderPacked ในฐานะ Producer เพื่อ แจ้งรายละเอียดของ ข้อมูลของ Order ที่เพิ่ง Pack เสร็จ โดยมีข้อมูลของ Order ทั้งหมด และเพิ่ม
  • ระบบ Notification จะเป็น Consumer ของทั้งสาม Topic นั่นก็คือ เมื่อมี Message ใหม่เข้ามา เกี่ยวกับ Order ทำการส่ง Notification ไปยัง App ฝั่งของลูกค้า
  • ระบบ Analytics จะเป็น Consumer ของทุก Topic เมื่อมี Message ใหม่เข้ามา มันจะทำการอัพเดทข้อมูลยอดขายรายวัน ความเร็วในการแพคสินค้า

จะเห็นได้ว่า เมื่อมีระบบที่เป็นระบบกลางในการติดต่อสื่อสารกันแบบนี้ เราจึงสามารถแยกทุกระบบออกจากกันได้อย่างอิสระ ไม่จำเป็นว่าระบบ Fulfillment จะต้องมีการเรียก API ของ Apple หรือ Android เพื่อส่ง Notification และระบบจ่ายเงิน ก็อาจจะอยู่แยกกับระบบหน้า Shopping Cart ด้วย ก็ยังสามารถทำได้

โดยการที่ระบบต่างๆ ทำงาน ตาม Message ใน Topic ที่ปรากฏขึ้นเรื่อยๆ นี้ เรียกว่าเป็นการทำงานแบบ Stream Processing 

ส่วนที่เราเรียกว่ามันเป็น Stream นั้น ก็เพราะว่า Message นั้นจะไหลมาใส่ Topic เรื่อยๆ ไม่มีวันจบ และสำหรับถ้าใช้ Kafka ด้วย ก็จะสามารถพายเรือทวนน้ำ ตามน้ำ ไปหยิบ Message เก่ากว่า ใหม่กว่า ออกมาได้อีกด้วย

Stream Processing ในโปรแกรมด้วย Rx

พอพูดถึง Kafka เราเลยอาจจะมองว่าเป็นอะไรที่ใหญ่โต ใช้สำหรับระบบคุยกับระบบ หรือ Service คุยกัน แต่ถ้าลองมองกลับมาที่โปรแกรมของเรา บางทีก็อาจจะมี Use case ที่สามารถเอา Event Driven Architecture และ Stream Processing มาใช้งานด้วยก็ได้

ย้อนกลับไปที่ย่อหน้าแรก ที่ไม่มีใครเข้าใจว่า ทำไมจึงจะต้องใช้ Rx ในเมื่อ C# นั้น มีระบบ Event อยู่แล้ว ผมลองสำรวจแล้ว พบว่า มีสอง Use case ที่เราจะควรใช้ Rx แทนที่ Event ธรรมดาที่มากับ C# 

1) Producer ที่ส่ง Event ออกมานั้น เริ่มทำงานไม่พร้อมกับ Consumer

ที่พบบ่อยครั้ง ก็คือ Consumer นั้น ต้องการค่าล่าสุด จาก Producer เลยเข้าไปจับ Event แต่ถ้าหากว่า ไม่เคยเกิด Event ใหม่ขึ้นเลย หลังจากที่ Consumer จับ Event ค่านั้นก็จะไม่มีทางส่งมาถึง Consumer ได้ ในกรณีนี้ ก็คือ เรา "จับ Event ไม่ทัน" โดยสรุปคือ Message บางส่วนเกิดขึ้นก่อนที่ Consumer นั้น จะเริ่มจับ Event ขาดหายไป

ทั้งนี้ขึ้นอยู่กับโปรแกรมที่เราสร้างด้วยว่า มีความจำเป็นที่จะต้อง "ย้อนเวลา" ดู Message ตั้งแต่ Producer เริ่มทำงานหรือไม่

2) เราต้องการให้ Producer ทำงานแบบ On-Demand

Use case หนึ่งที่น่าสนใจ ก็คือ Producer ที่ใช้ทรัพยากรสูง ตัวอย่างเช่น การอ่านค่าจาก Hardware ที่ต้องติดต่อกับ Driver ภายนอก .NET ถ้าหากว่าเราต้องการจะให้ Producer นั้น มีการส่ง Stream ของ Message ออกมา ก็ต่อเมื่อ มีผู้ที่รอรับ Message อยู่เท่านั้น

ถึงแม้การใช้ระบบ Event ธรรมดา สามารถทำได้ โดยการเขียนโค๊ดอยู่ในภายใน Add/Remove ของตัว Event แต่โค๊ดการบริการจัดการ State ของตัวคลาส ก็จะกระจัดกระจายกันอยู่ สองที่ อาจจะมีปัญหากับการ Maintain ในภายหลัง นอกจานี้ การใช้ Rx จะช่วยลดภาระเรื่องการเริ่ม/หยุด Stream ที่เราต้องทำเองอีกด้วย

Subject นั้นไม่ใช่ Topic

สิ่งที่มีลักษณะคล้าย Topic บน Kafka ใน Rx นั้น จะเรียกว่า Subject ความแตกต่างระหว่าง Subject และ Topic คือ มุมมองของการใช้งาน ดังนี้

  • Topic: เปรียบเหมือนกระดานแปะ Post-it ที่ทุกคนใช้ร่วมกัน โดย Producer เป็นผู้ที่นำ Message มาใส่ลงไปใน Topic (หรือ แปะ Post-In ลงบนกระดาน) และ Consumer เป็นผู้รับ Message เหล่านั้น (คือ มาหยิบ Post-It ออกไปจากกระดาน)
  • Subject: เปรียบเหมือน Producer ที่มี Topic อยู่ในตัว คือ Subject นั้น เป็นผู้สร้าง Message และเป็นผู้ที่สามารถให้ Consumer มารับ Message ไปด้วยได้ ในตัวเดียวกัน เปรียบเทียบกัน ก็คือในกรณีของ Subject นั้น Producer เป็นคนนำเอาแผ่น Post-It ไปยื่นให้กับ Consumer โดยตรง

การที่ Subject นั้น เป็นทั้ง Topic และ Producer ในตัวเดียวกัน จึงเกิดความสับสนอย่างมากว่า ความแตกต่างระหว่าง การเขียน Class C# ที่สามารถยิง Event ออกมาได้ กับการใช้ Subject นั้น มีความแตกต่างกันอย่างไร ทำไมเราจึงต้องใช้ Subject และทำไมจึงมี Rx เกิดขึ้นซ้ำซ้อนกับ Event

เรามาดูว่า Subject ที่มีอยู่หลายชนิดนั้น มีอะไรบ้าง อาจจะทำให้เข้าใจถึงคำถามเมื่อครู่นี้มากขึ้น

Subject นั้นมีหลายชนิด

Subject นั้นมีจำนวนชนิดมากกว่านี้ แต่ขอเลือกตัวที่เราจะได้ใช้จริงๆ ออกมาเล่าให้ฟัง 3 ตัวนี้ เปรียบเทียบกัน

  • Publish: เป็น Subject ชนิดที่สามารถมี Consumer หลายตัว ต่อเข้ามาได้ แต่จะเห็นว่า ตัว PublishSubject นั้น ปล่อย Message ออกมาตามลำดับ เหมือนการจับ Event ตามปกติ ก็คือ Consumer จะได้รับ Message ที่เกิดขึ้นหลังจากที่ได้ทำการ Subscribe แล้วเท่านั้น เหมือนกับ Event ของ C#
  • Replay: เป็น PublishSubject ที่สามารถย้อนเวลา เพื่อส่ง Message ที่ Consumer ที่มาทีหลังนั้นพลาดไป ให้กับ Consumer ที่มาทีหลังได้ด้วย ตัวอย่างในภาพคือ เราตั้ง Replay ไว้ที่ 2 Message เมื่อ ReplaySubject พบ Consumer ตัวใหม่ ที่ต่อเข้ามา มันจะทำการส่ง Message 2 ชิ้นล่าสุด ไปให้ตัว Consumer ใหม่นี้ด้วย
  • Behavior: เป็น ReplaySubject ชนิดที่ตั้งค่า Replay ไว้เป็น 1 Message แต่ว่าจะแตกต่างจาก Replay ตรงที่ เราสามารถใส่ Message เริ่มต้นให้ได้ ดังนั้น ถึงแม้หลังจากการทำงานไปแล้ว ไม่เกิด Message ใดๆ ขึ้นเลย Consumer ที่ต่อเข้ามา ก็จะได้รับ Message อย่างน้อย 1 Message ในขณะที่ถ้าเป็น ReplaySubject ที่ตั้งค่า Replay ไว้ 1 จะไม่ได้รับ Message ใดๆ เลย

พออ่านถึงตรงนี้ โปรแกรมเมอร์ที่เชี่ยวชาญเรื่อง OOP ก็อาจจะออกแบบการใช้งาน Rx ได้สองแนวทางคือ

  • ทำการสร้าง Producer ใหม่ โดยการ Inherit จาก Class ReplaySubject หรือ BehaviorSubject เพื่อให้ได้ความสามารถตามที่ Rx มีให้
  • นำเอา ReplaySubject หรือ BehaviorSubject สร้างเป็นตัวแปรอยู่ใน Class แล้วเปิดเป็น Read Only Property ให้ Consumer สามารถมองเห็น และมาต่อกับมันได้ โดย Class เดิมของเรา จะทำการนำ Message ส่งไปให้ Subject นั้น เพื่อให้ Subject ส่งไปให้ Consumer ต่ออีกทีหนึ่ง

แต่ว่าการใช้ Rx ในรูปแบบนี้ (ใช้ Subject โดยตรง) เป็นแนวทางที่ผิดไปจากความต้องการของ Pattern การเขียนแบบ Reactive ตรงที่ว่า

  • Rx ต้องการให้เราเขียนแบบ Functional คือ Rx ต้องการที่จะเป็นคนจัดการ State ของ Subject เหล่านี้ให้เรา โดยเรา เป็นคนที่ระบุแค่ "วิธีการ" ในการที่จะ Produce Message ซึ่ง Rx จะทำ Factory Method ชื่อว่า Observable.Create ไว้ให้เราแล้ว ตามแนวคิดของ Functional คือ เราขยายความสามารถของ Object ที่เราสนใจ ด้วยการเพิ่ม Function ครอบไปเรื่อยๆ หรือ ส่ง "วิธีการ" ให้กัน แบบนี้

    การที่เรา Inherit จาก Subject จึงไม่ใช่วิธีการแบบ Functional แต่เป็นวิธีการแบบ OOP คือ เราไม่ได้แค่ระบุวิธีการ ด้วย Function แต่เป็นการ เปลี่ยนรูปของตัว Subject ไปเป็น Concrete Class ที่เรา Inherit ไป จากนั้น แก้ไขวิธีการทำงานของ Subject ในการ Produce Message เป็นแบบที่เราต้องการ

    ส่วนการที่เรานำเอา Subject มาเก็บไว้ภายในคลาสของเรา ก็จะเป็นวิธีการแบบ OOP อีกเช่นกัน คือเรา นั้น "หุ้ม" หรือ Compose ตัว Subject ให้มาเป็นส่วนหนึ่งของคลาสเรา แต่เรานั้น ไม่ได้กลายเป็น Subject หรือพูดได้อีกอย่างว่า เราใช้ Subject เป็น Message Queue หรือ Topic บน Kafka

    (แต่ไม่ใช่ว่า ทั้งสองแนวทางนั้น ไม่สามารถใช้ได้ เพราะว่า Rx นั้นมองว่าตัวเองเป็น Library ไม่ได้จำกัดวิธีการใช้งาน แต่เมื่อเราลองนำ Rx มาใช้ ด้วยการใช้ตัว Subject โดยตรงแบบนี้แล้ว จะพบว่า ไม่ต่างอะไรจากการสร้าง Event ธรรมดาใน C# เลย แต่แน่นอนว่า สำหรับภาษาอื่นๆ ที่ไม่มีระบบ Event แล้ว เช่น JavaScript ตัว Subject ของ Rx สามารถช่วยแก้ไขข้อจำกัดที่ตัวภาษา ไม่มี Event ได้เป็นอย่างดี)
  • Rx มี Operator ที่ใช้ในการ Configure ตัว Subject ให้เรา นั่นก็คือ หลังจากเราสร้าง Subject ด้วย Factory Method ที่ Rx มีให้แล้ว และระบุวิธีการ Produce Message แล้ว เรายังสามรถปรับให้มันกลายเป็น Subject ชนิดใดก็ได้ ไม่ว่าจะเป็น Publish, Replay หรือ Behavior ตามแนวทางแบบ Functional

ปรับเปลี่ยนคำศัพท์

ก่อนหน้านี้ ผู้เขียนใช้คำว่า Producer, Consumer และ Subject แต่ Rx นั้น เลือกใช้คำศัพท์ต่างออกไป ซึ่งออกเสียงคล้ายกัน ทำไห้เวลาอ่าน จะเกิดความสับสนได้ง่าย ผู้เขียนจึงเลือกใช้คำแบบของ Kafka ไปก่อน แต่เมื่อเราจะต้องเริ่มทำความรู้จักกับ Rx ลึกซึ้งขึ้น เลยต้องกลับมาใช้คำศัพท์ของ Rx แล้ว นั่นคือ

  • Observable - หมายถึง Topic หรือ Subject ที่มี Message ส่งออกมาเรื่อยๆ เป็น Stream โดยเมื่อเราลงมือเขียนโปรแกรมจริงด้วย Rx แล้ว ตัว Observable นั้น จะมีลักษณะการทำงานเป็น Producer ในตัวมันเองด้วย คือ มันเป็นคนที่สร้าง Message ออกมาด้วย ไม่ใช่เป็นแค่ที่พัก Message เพียงอย่างเดียว เหมือนกับ Topic ของ Kafka

    แต่ทั้งนี้ Observable นั้น ไม่ได้มีความสัมพันธ์กับ Subject แบบ 1:1 นั่นก็คือ Instance ของ Observable เดียวกัน เมื่อสั่ง Subscribe จะเป็นการสร้าง Instance ใหม่ของ Subject ออกมา เสมอ
  • Observer - หมายถึง ผู้ที่รับ Message ไปใช้งาน ซึ่งคือ Consumer นั่นเอง โดย Observer จะเป็นผู้ที่ Subscribe ไปยัง Observable
  • Sequence - หมายถึง Series ของ Message ที่ส่งออกมาจาก Producer ทั้งนี้ Sequence สามารถมีได้ทั้งแบบ Finite คือ มีจำนวนของ Message ใน Sequence ที่แน่นอน และแบบ Infinite คือ Message สามารถเกิดขึ้นได้ไม่รู้จบ

ทาง Rx เอง ได้มีการบัญญัติศัพท์ ขึ้นมาอีก 2 คำ คือ

  • Cold Observable - หมายถึง Subject (Observable) ที่เราจะเข้าไป Consume (Subscribe) นั้น จะไม่ทำงาน จนกว่าเราเข้าไป Subscribe เช่น Observable ที่อ่านข้อมูลจาก Database แล้วส่งค่าออกมาเป็น Sequence ของ Message แทนที่จะคืนค่าเป็น IEnumerable หรือ List โดยทั่วไป

    โดยความเห็นส่วนตัวของผู้เขียนแล้ว การใช้ Rx เพื่อสร้าง Cold Observable เป็นสิ่งที่ควรหลีกเลี่ยง เนื่องจากเราสามารถเขียนโค๊ดติดต่อกับ Database หรือ API ตามปกติ แล้วทำการคืนค่าเป็น IEnumerable หรือ List ได้ โดยไม่จำเป็นต้องเพิ่ม Rx เข้าไป ให้ผู้ที่มาใช้งานต่อ เกิดความสับสน
  • Hot Observable - หมายถึง Subject นั้น จะมีการส่ง Message ออกมาตลอดเวลา ไม่ว่าจะมีผู้ที่เข้าไป Subscribe หรือไม่ก็ตาม ตัวอย่างเช่น ข้อมูลจาก Sensor, ตำแหน่งของเมาส์ หรือ ข้อมูลที่ Client ส่งมาผ่าน TCP Socket มาที่เครื่อง Server เป็นต้น

    โดยความเห็นส่วนตัวของผู้เขียนแล้ว เราควรใช้ Rx เฉพาะกับ Use case ที่เราต้องการเข้าไป Consume สิ่งที่เป็น Hot Observable นี้เท่านั้น

Operator ทำให้ Rx น่าใช้กับ Hot Observable

สิ่งที่ Rx จะเข้ามาช่วยลดความยุ่งยากให้กับเรา กับ Hot Observable คือ Operator ที่ได้กล่าวถึงไว้ก่อนหน้านี้  โดย Operator ที่ Rx มีให้เราใช้นั้น น่าจะมีเกือบร้อยตัว ถ้าสนใจอยากรู้ สามารถไปดูรายการได้ที่ ReactiveX - Operators สำหรับตัวที่น่าสนใจ เพื่อการใช้งานทั่วไปกับ Hot Observable มีดังนี้

  • Publish - ใช้สำหรับการแชร์ Subscription ไปที่ Observable (Subject) ตัวเดียวกัน โดน Publish จะเป็น Observer เพียงตัวเดียว ที่ต่อไปยัง Observable ต้นทาง แล้วจึงเอาข้อมูลนั้น มากระจายต่อไปยัง Observer ของตัวมันอีกทีหนึ่ง

    จากที่เกริ่นไปก่อนหน้านี้ว่า Observable นั้น ไม่ได้มีความสัมพันธ์กับ Subject แบบ 1:1 และการที่ Observable ตั้งชื่อ Method ว่า Subscribe ทำให้คนที่มาเรียกใช้ รู้สึกเหมือนว่า Observable สามารถรับ Observer มากกว่า 1 ตัวได้ แต่ความเป็นจริงแล้ว ทุกครั้งที่เราเรียกใช้งาน Subscribe กับ Observable Instance เดิม จะเป็นการสร้าง Instance ของ Subject ใหม่ ออกมาเสมอ การเรียก Subscribe ทุกครั้ง จึงเป็นการ Subscribe ไปที่คนละ Instance ของ Subject กัน

    ดังนั้น ถ้าเราต้องการให้ Observable ทำงานแบบที่คนส่วนใหญ่เข้าใจ จึงจะต้องมี Operator Publish ต่อท้ายเสมอ และเมื่อทุกคนพร้อมแล้ว จึงสั่ง Connect() เพื่อเป็นการสั่งให้ PublishSubject ทำการต่อไปยัง Subject ต้นทาง อีกครั้ง และเริ่มการทำงาน หรือสามารถใช้ AutoConnect ต่อจาก Publish เพื่อให้ Rx ทำการ Connect ไปยัง Observable
  • RefCount - เป็น Operator ที่ทำงานต่อเนื่องมาจาก Publish โดยที่ RefCount นั้นจะทำการ Connect ไปที่ Observable เมื่อมีจำนวนคน Subscription ครบตามจำนวนที่ตั้งไว้ และจะทำการ Dispose ตัว Observable ให้ เมื่อมีจำนวน Subscriber น้อยกว่าจำนวนที่ตั้งไว้

นอกจากนี้ จะมี Operator ที่เข้ามาช่วยในการจัดการ กับ Observable ที่ปล่อย Message ออกมาเร็วเกินกว่าที่ฝั่ง Observe ได้ทัน ซึ่งมีประโยชน์มากกับเรา ดังนี้

  • Window : ทำการรวม Message จาก Observable ต้นทาง แล้วได้ออกมาเป็น Observable ตัวใหม่ สำหรับแต่ละชุดในกรอบ Window ที่กำหนด (ถ้าแปลกใจว่าทำไมต้องได้ออกมาเป็น Observable อย่าลืมว่า สำหรับ Rx นั้น จะมอง Observable เหมือนเป็น List และ Rx นั้น ใช้งานได้หลายภาษา ที่บางภาษาอาจจะไม่มีสิ่งที่เหมือนกับ List อยู่)
  • Buffer : ทำงานเหมือน Window แต่ว่า เราจะได้เป็น List ของ Message ในทุกๆ Window ที่กำหนด ซึ่งจะตรงกับความต้องการของเรามากกว่า ในภาษา C#
  • Debounce: Message ที่มาถี่เกินไป จะถูก Discard ออก กรณีที่ไม่จำเป็นจะต้อง Process ทุก Message ที่ส่งมา เนื่องจาก Message ตัวล่าสุดภายในระยะเวลาที่ระบุ ก็มีความหมายเหมือนกัน

ส่งท้าย

หวังว่าโพสนี้ จะเป็นการแนะนำให้คุณรู้จักกับ ReactiveX ได้พอสมควร ส่วนการนำไปใช้งานนั้น ตอนนี้ผู้เขียนมีการใช้งาน Rx อยู่กับโปรแกรม SystemX ที่ติดตั้งในเครื่อง LEVEL51 โดยใช้ในส่วนการอ่านค่า Sensor และการทำงานที่เป็น Interval ทั้งหมด ถ้ามีโอกาส จะนำมาเล่าให้ฟังกันครับ