Apache Beam dans Data Input Large Side Input

Ceci est très similaire à cette question .

Je crée un pipeline dans Dataflow 2.x qui prend les entrées en continu d’une queue Pubsub. Chaque message doit être diffusé via un dataset très volumineux fourni par Google BigQuery et auquel sont associées toutes les valeurs pertinentes (basées sur une clé) avant d’être écrites dans une firebase database.

Le problème est que le jeu de données de mappage de BigQuery est très volumineux – toute tentative d’utilisation en tant qu’entrée secondaire échoue avec les exécuteurs Dataflow générant l’erreur “exception java.lang.IllegalArgumentException: ByteSsortingng serait trop long”. J’ai essayé les stratégies suivantes:

1) entrée latérale

  • Comme indiqué, les données de mappage sont (apparemment) trop grandes pour le faire. Si je me trompe ici ou s’il y a un remède à cela, faites-le moi savoir, car ce serait la solution la plus simple.

2) Mappage de paires clé-valeur

  • Dans cette stratégie, je lis les données BigQuery et les données de message Pubsub dans la première partie du pipeline, puis je les exécute chacune via des transformations ParDo qui modifient chaque valeur des paires PCollections en KeyValue. Ensuite, j’exécute une transformation Merge.Flatten et une transformation GroupByKey pour joindre les données de mappage pertinentes à chaque message.
  • Le problème réside dans le fait que les données en streaming nécessitent une fusion avec d’autres données. Je dois donc également appliquer le fenêtrage aux grandes données BigQuery limitées. Cela nécessite également que les stratégies de fenêtrage soient les mêmes sur les deux ensembles de données. Mais aucune stratégie de fenêtrage pour les données limitées n’a de sens, et les quelques tentatives de fenêtrage que j’ai effectuées envoient simplement toutes les données BQ dans une seule fenêtre et ne les envoient plus jamais. Il doit être joint à chaque message entrant de pubsub.

3) Appeler BQ directement dans un ParDo (DoFn)

  • Cela semblait être une bonne idée – demandez à chaque travailleur de déclarer une instance statique des données cartographiques. Si ce n’est pas le cas, appelez directement BigQuery pour l’obtenir. Malheureusement, cela génère des erreurs internes de BigQuery à chaque fois (comme dans le message entier, il suffit de dire “Erreur interne”). En déposant un ticket d’assistance auprès de Google, ils m’ont dit que, essentiellement, “vous ne pouvez pas faire cela”.

Il semble que cette tâche ne corresponde pas vraiment au modèle “parallèlement embarrassant”, alors est-ce que j’aboie le mauvais arbre ici?

MODIFIER :

Même en utilisant une machine à haute mémoire dans le stream de données et en essayant de faire une entrée latérale dans une vue de carte, j’obtiens l’erreur java.lang.IllegalArgumentException: ByteSsortingng would be too long

Voici un exemple (psuedo) du code que j’utilise:

  Pipeline pipeline = Pipeline.create(options); PCollectionView<Map> mapData = pipeline .apply("ReadMapData", BigQueryIO.read().fromQuery("SELECT whatever FROM ...").usingStandardSql()) .apply("BQToKeyValPairs", ParDo.of(new BQToKeyValueDoFn())) .apply(View.asMap()); PCollection messages = pipeline.apply(PubsubIO.readMessages() .fromSubscription(Ssortingng.format("projects/%1$s/subscriptions/%2$s", projectId, pubsubSubscription))); messages.apply(ParDo.of(new DoFn() { @ProcessElement public void processElement(ProcessContext c) { JSONObject data = new JSONObject(new Ssortingng(c.element().getPayload())); Ssortingng key = getKeyFromData(data); TableRow sideInputData = c.sideInput(mapData).get(key); if (sideInputData != null) { LOG.info("holyWowItWOrked"); c.output(new TableRow()); } else { LOG.info("noSideInputDataHere"); } } }).withSideInputs(mapData)); 

Le pipeline déclenche l’exception et échoue avant d’enregistrer quelque chose dans ParDo .

Trace de la stack:

 java.lang.IllegalArgumentException: ByteSsortingng would be too long: 644959474+1551393497 com.google.cloud.dataflow.worker.repackaged.com.google.protobuf.ByteSsortingng.concat(ByteSsortingng.java:524) com.google.cloud.dataflow.worker.repackaged.com.google.protobuf.ByteSsortingng.balancedConcat(ByteSsortingng.java:576) com.google.cloud.dataflow.worker.repackaged.com.google.protobuf.ByteSsortingng.balancedConcat(ByteSsortingng.java:575) com.google.cloud.dataflow.worker.repackaged.com.google.protobuf.ByteSsortingng.balancedConcat(ByteSsortingng.java:575) com.google.cloud.dataflow.worker.repackaged.com.google.protobuf.ByteSsortingng.balancedConcat(ByteSsortingng.java:575) com.google.cloud.dataflow.worker.repackaged.com.google.protobuf.ByteSsortingng.copyFrom(ByteSsortingng.java:559) com.google.cloud.dataflow.worker.repackaged.com.google.protobuf.ByteSsortingng$Output.toByteSsortingng(ByteSsortingng.java:1006) com.google.cloud.dataflow.worker.WindmillStateInternals$WindmillBag.persistDirectly(WindmillStateInternals.java:575) com.google.cloud.dataflow.worker.WindmillStateInternals$SimpleWindmillState.persist(WindmillStateInternals.java:320) com.google.cloud.dataflow.worker.WindmillStateInternals$WindmillCombiningState.persist(WindmillStateInternals.java:951) com.google.cloud.dataflow.worker.WindmillStateInternals.persist(WindmillStateInternals.java:216) com.google.cloud.dataflow.worker.StreamingModeExecutionContext$StepContext.flushState(StreamingModeExecutionContext.java:513) com.google.cloud.dataflow.worker.StreamingModeExecutionContext.flushState(StreamingModeExecutionContext.java:363) com.google.cloud.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1000) com.google.cloud.dataflow.worker.StreamingDataflowWorker.access$800(StreamingDataflowWorker.java:133) com.google.cloud.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:771) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) java.lang.Thread.run(Thread.java:745)