예제별 이벤트 기반 분산 프로세스

in #kr-dev2 years ago

얼마 전에 이벤트 기반 서비스 처리에 대한 일반적인 규칙을 작성했습니다. 내 기사 Saga 및 프로세스 관리자 - 실제로 분산 프로세스를 확인하십시오 . 프로세스 관리자, 사가 및 안무의 주요 차이점을 설명했습니다. 오늘은 이벤트 소싱 및 이벤트 상점을 사용하여 문제를 해결하는 방법을 확장하고 싶습니다.

이벤트 기반 접근 방식을 사용하는 분산 프로세스는 분산 트랜잭션에서 2단계 커밋의 불가능성을 수용합니다. 모듈과 데이터베이스에 걸쳐 큰 거래를 시도하는 대신 일련의 소액 거래 를 수행 합니다. 각 작업은 정보 소스인 모듈에 의해 처리되며 자율적인 결정을 내릴 수 있습니다. 분산 프로세스는 시스템에 등록 및 게시된 이벤트(예: 장바구니 확인)에 의해 트리거됩니다. 그런 다음 다른 모듈이 구독하고 거기에서 가져올 수 있습니다. 예를 들어 주문 프로세스 시작과 같이 다음 작업이 무엇인지 알고 있습니다. 처리되는 명령을 보내고 비즈니스 로직은 또 다른 이벤트를 생성합니다. 이 이벤트는 워크플로의 다음 단계에 대한 트리거입니다. 그런 _라자냐_of event/command/event/command는 프로세스가 완료될 때까지 계속됩니다(성공 또는 실패).

라자냐

일괄 작업

분산 프로세스의 일반적인 예는 일괄 작업을 처리하는 것입니다. 예를 들어 호텔에서 단체 고객이 체크아웃하는 경우입니다. 따라서 누군가 게스트 계정 집합을 선택하고 "체크아웃"을 클릭합니다. 그 후 프로세스는 모든 항목을 확인하려고 시도합니다. 전체 프로세스가 실패하면 보상은 없지만 문제를 해결한 후 결제를 다시 실행할 수 있습니다.

Group checkout saga는 다음과 같습니다.

public class GroupCheckoutSaga {
  private final CommandBus commandBus;

  public GroupCheckoutSaga(CommandBus commandBus) {
    this.commandBus = commandBus;
  }

  public void on(GroupCheckoutInitiated groupCheckoutInitiated) {
    for (var guestAccountId : groupCheckoutInitiated.guestStayAccountIds()) {
      commandBus.send(
        new CheckoutGuestAccount(guestAccountId, groupCheckoutInitiated.groupCheckoutId())
      );
    }
    commandBus.send(
      new RecordGuestStayInitiation(groupCheckoutInitiated.groupCheckoutId(), groupCheckoutInitiated.guestStayAccountIds())
    );
  }

  public void on(GuestStayAccountEvent.GuestAccountCheckoutCompleted guestCheckoutCompleted) {
    if (guestCheckoutCompleted.groupCheckoutId() == null)
      return;

    commandBus.send(
      new RecordGuestCheckoutCompletion(
        guestCheckoutCompleted.groupCheckoutId(),
        guestCheckoutCompleted.guestStayAccountId(),
        guestCheckoutCompleted.completedAt()
      )
    );
  }

  public void on(GuestStayAccountEvent.GuestAccountCheckoutFailed guestCheckoutFailed) {
    if (guestCheckoutFailed.groupCheckoutId() == null)
      return;

    commandBus.send(
      new RecordGuestCheckoutFailure(
        guestCheckoutFailed.groupCheckoutId(),
        guestCheckoutFailed.guestStayAccountId(),
        guestCheckoutFailed.failedAt()
      )
    );
  }
}

GroupCheckoutSaga.java 에서 더 보기

Saga는 명령 버스를 통해 명령을 감지하고 명령을 내구성 있게 저장하고 전송함 패턴을 사용하여 전달되었는지 확인합니다. ESDB 및 해당 구독을 사용하는 예제 명령 버스는 다음과 같습니다.

public class ESDBCommandBus implements CommandBus {
  private static final String commandStreamId = "_commands-all";
  private final EventStoreDBClient eventStoreDBClient;
  private final EventStore eventStore;
  private final RetryPolicy retryPolicy;
  private final Supplier<String> currentCorrelationId;
  private final Supplier<String> currentCausationId;

  public ESDBCommandBus(
    EventStoreDBClient eventStoreDBClient,
    EventStore eventStore,
    RetryPolicy retryPolicy,
    Supplier<String> currentCorrelationId,
    Supplier<String> currentCausationId
  ) {
    this.eventStoreDBClient = eventStoreDBClient;
    this.eventStore = eventStore;
    this.retryPolicy = retryPolicy;
    this.currentCorrelationId = currentCorrelationId;
    this.currentCausationId = currentCausationId;
  }

  @Override
  public <Command> EventStore.AppendResult send(Command command) {
    return retryPolicy.run(ack -> {
      var result = eventStore.append(
        commandStreamId,
        new CommandEnvelope<>(command, new CommandMetadata(currentCorrelationId.get(), currentCausationId.get()))
      );

      if (!(result instanceof EventStore.AppendResult.UnexpectedFailure))
        ack.accept(result);
    });
  }

  @Override
  public void subscribe(Consumer<CommandEnvelope<Object>>... handlers) {
    subscribeToStream(eventStoreDBClient, commandStreamId, (subscription, resolvedEvent) -> {
      var commandEnvelope = deserializeCommand(resolvedEvent);

      if (commandEnvelope.isEmpty()) {
        return;
      }

      for (var handler : handlers) {
        handler.accept(commandEnvelope.get());
      }
    });
  }
}

ESDBCommandBus.java 에서 더 보기

saga 처리의 비즈니스 논리는 집계에 위임됩니다. 그 덕분에 우리는 조정(saga)과 비즈니스 로직(aggregate) 사이에 명확한 책임 분담이 있습니다. 그 덕분에 saga는 가볍고 유지 관리가 프로세스 관리자에 병합하는 것보다 훨씬 쉽습니다.

GroupCheckout 집계 에서 참조하십시오 .

잔액이 정산되면 게스트 계정을 확인할 수 있습니다. 그렇지 않은 경우 실패 이벤트를 저장합니다.

public void checkout(@Nullable UUID groupCheckoutId, OffsetDateTime now) {
  if (status != Status.Open || balance != 0) {
    enqueue(new GuestAccountCheckoutFailed(id(), groupCheckoutId, OffsetDateTime.now()));
  }
  enqueue(new GuestAccountCheckoutCompleted(id(), groupCheckoutId, now));
}

GuestStayAccount 집계 에서 더 많은 것을 보십시오 .

임의의 일시적인 오류로 인해 실패하지 않도록 명령 처리기/응용 프로그램 서비스에서 재시도 정책을 사용해야 합니다.

public ETag handle(CheckoutGuestAccount command) {
    return retryPolicy.run(ack -> {
      var result = store.getAndUpdate(
        current -> current.checkout(
          command.guestStayAccountId(),
          OffsetDateTime.now()
        ),
        command.guestStayAccountId()
      );
      ack.accept(result);
    });
  }

GuestStayAccountService 에서 더 보기

재시도 정책의 대안으로 명령 처리기에서 포켓몬 예외 처리 를 수행 한 다음 실패 이벤트를 게시할 수 있습니다.

보상이 포함된 교차 모듈 프로세스

앞의 예에서는 모든 처리가 동일한 모듈에 묶여 있었기 때문에 내부 이벤트와 외부 이벤트 사이에 명확한 분할이 없었습니다. 내부 용어를 외부 세계에 매핑할 필요가 없었습니다.

모듈 간 조정을 수행할 때 주의해야 합니다. 예를 들어 Order Saga는 Shopping Cart 모듈에 의해 시작됩니다. 그런 다음 전용 모듈에 의해 처리되는 배송 중 결제를 시작해야 합니다. 모든 내부 이벤트를 외부에 노출하면 추상화가 누출될 수 있습니다. 이벤트는 가능한 한 작아야 한다고 더 길게 썼죠? . 내부 이벤트와 외부 이벤트를 명확하게 구분한 다음 서로 매핑해야 합니다.

그렇게 하는 동안 외부 이벤트가 덜 세분화될 수 있고 전체 시스템의 범위에서 이해할 수 있어야 하므로(예: 큰 그림 Event Storming 세션의 결과에 가깝기 때문에) 보강 작업을 수행할 수 있습니다. 예제 보강은 다음과 같습니다.

public class ShoppingCartExternalEventForwarder {
  private final AggregateStore<ShoppingCart, ShoppingCartEvent, UUID> store;
  private final EventBus eventBus;

  public ShoppingCartExternalEventForwarder(
    AggregateStore<ShoppingCart, ShoppingCartEvent, UUID> store,
    EventBus eventBus
  ) {
    this.store = store;
    this.eventBus = eventBus;
  }

  public void on(ShoppingCartConfirmed event) {
    var cart = store.get(event.shoppingCartId())
      .orElseThrow(() -> new IllegalStateException("Cannot enrich event, as shopping cart with id '%s' was not found".formatted(event.shoppingCartId())));

    var externalEvent = new ShoppingCartFinalized(
      event.shoppingCartId(),
      cart.clientId(),
      cart.productItems(),
      cart.totalPrice(),
      event.confirmedAt()
    );

    eventBus.publish(externalEvent);
  }
}

ShoppingCartExternalEventForwarder.java 에서 자세한 내용을 확인하세요 .

이러한 이벤트는 일부 모듈에 있는 saga에서 처리할 수 있습니다. 예를 들어:

public class OrderSaga {
  private final CommandBus commandBus;

  public OrderSaga(CommandBus commandBus) {
    this.commandBus = commandBus;
  }

  // Happy path
  public void on(ShoppingCartFinalized event) {
    commandBus.send(
      new OrderCommand.InitializeOrder(
        event.cartId(),
        event.clientId(),
        event.productItems(),
        event.totalPrice()
      )
    );
  }

  public void on(OrderInitialized event) {
    commandBus.send(
      new PaymentCommand.RequestPayment(
        UUID.randomUUID(),
        event.orderId(), event.totalPrice()
      )
    );
  }

  public void on(PaymentExternalEvent.PaymentFinalized event) {
    commandBus.send(
      new RecordOrderPayment(
        event.orderId(),
        event.paymentId(),
        event.finalizedAt()
      )
    );
  }

  public void on(OrderPaymentRecorded event) {
    commandBus.send(
      new ShipmentCommand.SendPackage(
        event.orderId(),
        Arrays.stream(event.productItems())
          .map(pi -> new ProductItem(pi.productId(), pi.quantity()))
          .toArray(ProductItem[]::new)
      )
    );
  }

  public void on(ShipmentEvent.PackageWasSent event) {
    commandBus.send(
      new CompleteOrder(
        event.orderId()
      )
    );
  }

  // Compensation
  public void on(ShipmentEvent.ProductWasOutOfStock event) {
    commandBus.send(
      new CancelOrder(
        event.orderId(),
        OrderCancellationReason.ProductWasOutOfStock
      )
    );
  }

  public void on(OrderCancelled event) {
    if (event.paymentId() == null) {
      return;
    }
    commandBus.send(
      new PaymentCommand.DiscardPayment(
        event.paymentId(),
        DiscardReason.OrderCancelled
      )
    );
  }
}

자세히 살펴보면 부정적인 시나리오도 처리하고 있음을 알 수 있습니다. 제품이 품절된 경우 주문을 취소하라는 명령을 보내고 있습니다. 주문을 취소하면 결제가 취소됩니다(주문을 완료할 수 없는 경우 고객에게 금액을 반환해야 하므로). 이 과정을 보상이라고 합니다. Ex가 Event-Driven Design과 어떤 관련이 있습니까? 에서 그 중요성에 대해 자세히 읽어보십시오 . .

각 모듈:

전용 이벤트 스토어 스트림을 사용하여 외부 이벤트(예: 'shopping_carts__external-events' )를 게시할 수 있습니다 . 다른 모듈은 해당 스트림을 구독하고 거기에서 가져올 수 있습니다. Kafka의 토픽과 유사한 개념입니다.

출처 : https://event-driven.io/en/event_driven_distributed_processes_by_example/

Sort:  

[광고] STEEM 개발자 커뮤니티에 참여 하시면, 다양한 혜택을 받을 수 있습니다.

Coin Marketplace

STEEM 0.21
TRX 0.20
JST 0.034
BTC 98833.06
ETH 3391.57
USDT 1.00
SBD 3.08