Ceci est très similaire à cette question .
Je crée un pipeline dans Dataflow 2.x qui prend les entrées en continu d’une queue Pubsub. Chaque message doit être diffusé via un dataset très volumineux fourni par Google BigQuery et auquel sont associées toutes les valeurs pertinentes (basées sur une clé) avant d’être écrites dans une firebase database.
Le problème est que le jeu de données de mappage de BigQuery est très volumineux – toute tentative d’utilisation en tant qu’entrée secondaire échoue avec les exécuteurs Dataflow générant l’erreur “exception java.lang.IllegalArgumentException: ByteSsortingng serait trop long”. J’ai essayé les stratégies suivantes:
1) entrée latérale
2) Mappage de paires clé-valeur
3) Appeler BQ directement dans un ParDo (DoFn)
Il semble que cette tâche ne corresponde pas vraiment au modèle “parallèlement embarrassant”, alors est-ce que j’aboie le mauvais arbre ici?
MODIFIER :
Même en utilisant une machine à haute mémoire dans le stream de données et en essayant de faire une entrée latérale dans une vue de carte, j’obtiens l’erreur java.lang.IllegalArgumentException: ByteSsortingng would be too long
Voici un exemple (psuedo) du code que j’utilise:
Pipeline pipeline = Pipeline.create(options); PCollectionView<Map> mapData = pipeline .apply("ReadMapData", BigQueryIO.read().fromQuery("SELECT whatever FROM ...").usingStandardSql()) .apply("BQToKeyValPairs", ParDo.of(new BQToKeyValueDoFn())) .apply(View.asMap()); PCollection messages = pipeline.apply(PubsubIO.readMessages() .fromSubscription(Ssortingng.format("projects/%1$s/subscriptions/%2$s", projectId, pubsubSubscription))); messages.apply(ParDo.of(new DoFn() { @ProcessElement public void processElement(ProcessContext c) { JSONObject data = new JSONObject(new Ssortingng(c.element().getPayload())); Ssortingng key = getKeyFromData(data); TableRow sideInputData = c.sideInput(mapData).get(key); if (sideInputData != null) { LOG.info("holyWowItWOrked"); c.output(new TableRow()); } else { LOG.info("noSideInputDataHere"); } } }).withSideInputs(mapData));
Le pipeline déclenche l’exception et échoue avant d’enregistrer quelque chose dans ParDo
.
Trace de la stack:
java.lang.IllegalArgumentException: ByteSsortingng would be too long: 644959474+1551393497 com.google.cloud.dataflow.worker.repackaged.com.google.protobuf.ByteSsortingng.concat(ByteSsortingng.java:524) com.google.cloud.dataflow.worker.repackaged.com.google.protobuf.ByteSsortingng.balancedConcat(ByteSsortingng.java:576) com.google.cloud.dataflow.worker.repackaged.com.google.protobuf.ByteSsortingng.balancedConcat(ByteSsortingng.java:575) com.google.cloud.dataflow.worker.repackaged.com.google.protobuf.ByteSsortingng.balancedConcat(ByteSsortingng.java:575) com.google.cloud.dataflow.worker.repackaged.com.google.protobuf.ByteSsortingng.balancedConcat(ByteSsortingng.java:575) com.google.cloud.dataflow.worker.repackaged.com.google.protobuf.ByteSsortingng.copyFrom(ByteSsortingng.java:559) com.google.cloud.dataflow.worker.repackaged.com.google.protobuf.ByteSsortingng$Output.toByteSsortingng(ByteSsortingng.java:1006) com.google.cloud.dataflow.worker.WindmillStateInternals$WindmillBag.persistDirectly(WindmillStateInternals.java:575) com.google.cloud.dataflow.worker.WindmillStateInternals$SimpleWindmillState.persist(WindmillStateInternals.java:320) com.google.cloud.dataflow.worker.WindmillStateInternals$WindmillCombiningState.persist(WindmillStateInternals.java:951) com.google.cloud.dataflow.worker.WindmillStateInternals.persist(WindmillStateInternals.java:216) com.google.cloud.dataflow.worker.StreamingModeExecutionContext$StepContext.flushState(StreamingModeExecutionContext.java:513) com.google.cloud.dataflow.worker.StreamingModeExecutionContext.flushState(StreamingModeExecutionContext.java:363) com.google.cloud.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1000) com.google.cloud.dataflow.worker.StreamingDataflowWorker.access$800(StreamingDataflowWorker.java:133) com.google.cloud.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:771) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) java.lang.Thread.run(Thread.java:745)