Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scalability problem because empty temp dirs are not being deleted #1675

Open
dovka opened this issue Nov 1, 2020 · 10 comments
Open

Scalability problem because empty temp dirs are not being deleted #1675

dovka opened this issue Nov 1, 2020 · 10 comments

Comments

@dovka
Copy link
Contributor

dovka commented Nov 1, 2020

We run a two fairly large message ingestion systems with dozens of topics and total of millions of messages per day
(3M messages per day and 10Million messages each).
Secor nodes (2-4 nodes on each system) would run OK
when started but would have progressively bad performance with growing CPU and growing disk space until they crash.
(space is reaching 60% but sometimes disk full error is reported despite 40% more available)

image

We found the root cause for this behavior.
Secor spools files to local location as set by secor.local.path parameter.

The files are automatically purged, but the directories stay reaching millions of items on each node.
This causes the secor process to slow down the IO access and use more and more CPU for each IO operation until the secor crashes with java.io.IOException: Mkdirs failed to create:

2020-10-29 14:04:07,413 [Thread-10] (com.pinterest.secor.consumer.Consumer:183) ERROR Thread failed
java.lang.RuntimeException: Failed to write message ParsedMessage{topic='some-topic', kafkaPartition=29, offset=11250388, kafkaKey=, payload={"@timestamp":"2020-10-29T14:04:01.157Z","beat":{"hostname":"ip-10-0-28-98","name":"ip-10-0-28-98","version":"5.0.1"},"input_type":"log","message":"{"secor":{"intermediatePath":"PATH_START/some_path/1601182555/023130120130132/1c98f3c1a4c28caf93c39eacc03486e2"},"s32es":{"input_s3":{"bucket":"rem-secor-prod-eu.intermediate","key":"PATH_START/some_path/1601182555/023130120130132/1c98f3c1a4c28caf93c39eacc03486e2"},"output_s3":{"bucket":"rem-secor-prod-eu.completed","key":"some_path/1601182555/023130120130132/1c98f3c1a4c28caf93c39eacc03486e2/drsd_1c98f3c1a4c28caf93c39eacc03486e2.bml"},"elasticsearch":{"msg, timestamp=1603980241157, headers=[], mPartitions=[PATH_START, org, type, 1.3, me8, some_Inc, some_org, 2020-10-29, NaN-NaN-NaN, 2.1.0, 1601182555, 023130120130132, 1c98f3c1a4c28caf93c39eacc03486e2]}
at com.pinterest.secor.consumer.Consumer.consumeNextMessage(Consumer.java:252)
at com.pinterest.secor.consumer.Consumer.run(Consumer.java:159)
Caused by: java.io.IOException: Mkdirs failed to create /tmp/secor_data/64_21/some-topic/PATH_START/some_path/1601182555/023130120130132/1c98f3c1a4c28caf93c39eacc03486e2 (exists=false, cwd=file:/opt/secor)
at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:455)
at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:441)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1067)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1048)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:937)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:925)
at com.pinterest.secor.io.impl.DelimitedTextFileReaderWriterFactory$DelimitedTextFileWriter.(DelimitedTextFileReaderWriterFactory.java:112)
at com.pinterest.secor.io.impl.DelimitedTextFileReaderWriterFactory.BuildFileWriter(DelimitedTextFileReaderWriterFactory.java:59)
at com.pinterest.secor.util.ReflectionUtil.createFileWriter(ReflectionUtil.java:156)
at com.pinterest.secor.common.FileRegistry.getOrCreateWriter(FileRegistry.java:138)
at com.pinterest.secor.writer.MessageWriter.write(MessageWriter.java:104)
at com.pinterest.secor.consumer.Consumer.consumeNextMessage(Consumer.java:241)
... 1 more
2020-10-29 14:04:07,417 [Thread-6] (com.pinterest.secor.consumer.Consumer:183) ERROR Thread failed
java.lang.RuntimeException: Failed to write message ParsedMessage{topic='some-topic', kafkaPartition=21, offset=11256596, kafkaKey=, payload={"@timestamp":"2020-10-29T14:04:02.250Z","beat":{"hostname":"ip-10-0-28-98","name":"ip-10-0-28-98","version":"5.0.1"},"input_type":"log","message":"{some message"}
at com.pinterest.secor.consumer.Consumer.consumeNextMessage(Consumer.java:252)
at com.pinterest.secor.consumer.Consumer.run(Consumer.java:159)
Caused by: java.io.FileNotFoundException: /tmp/secor_data/64_17/some_path/1601182555/032000001233320/f1bd4a1d9491ae772eafed2373bf1a41/.1_21_00000000000011256548.crc (No space left on device)
at java.io.FileOutputStream.open0(Native Method)
at java.io.FileOutputStream.open(FileOutputStream.java:270)
at java.io.FileOutputStream.(FileOutputStream.java:213)
at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.(RawLocalFileSystem.java:232)
at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.(RawLocalFileSystem.java:219)
at org.apache.hadoop.fs.RawLocalFileSystem.createOutputStreamWithMode(RawLocalFileSystem.java:314)
at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:302)
at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:334)
at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSOutputSummer.(ChecksumFileSystem.java:403)
at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:462)
at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:441)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1067)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1048)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:937)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:925)
at com.pinterest.secor.io.impl.DelimitedTextFileReaderWriterFactory$DelimitedTextFileWriter.(DelimitedTextFileReaderWriterFactory.java:112)
at com.pinterest.secor.io.impl.DelimitedTextFileReaderWriterFactory.BuildFileWriter(DelimitedTextFileReaderWriterFactory.java:59)
at com.pinterest.secor.util.ReflectionUtil.createFileWriter(ReflectionUtil.java:156)
at com.pinterest.secor.common.FileRegistry.getOrCreateWriter(FileRegistry.java:138)
at com.pinterest.secor.writer.MessageWriter.write(MessageWriter.java:104)
at com.pinterest.secor.consumer.Consumer.consumeNextMessage(Consumer.java:241)
... 1 more

We delete the directories via external command:
find /tmp/secor_data/ -type d -empty -delete
this causes CPU as well as space growth to stabilize.

Is there any secor parameters to affect that behavior?
one system is version 0.26, another 0.29, both display the same symptoms.

here are our startup parameters:

Launching the secor java command line:
java -ea -Dsecor.kafka.group=secor_backup -Xmx1194m -Daws.access.key=XXXXXXXXXX-Daws.secret.key=XXXXXXXXXXXXXXXXX -Daws.region=us-east-1 -Dzookeeper.quorum=0.zk.service.consul:2181,1.zk.service.consul:2181,2.zk.service.consul:2181 -Dkafka.zookeeper.path=/ -Dkafka.seed.broker.host=kafka.service.consul -Dkafka.seed.broker.port=9092 -Dsecor.s3.bucket=mobileye.secor.intermediate -Dsecor.s3.filesystem=s3n -Dsecor.s3.path=me8 -Dsecor.kafka.topic_filter=.* -Dsecor.message.parser.class=com.pinterest.secor.parser.RsdParser -Dmessage.timestamp.name=@timestamp -Dsecor.local.path=/tmp/secor_data -Dsecor.file.reader.writer.factory=com.pinterest.secor.io.impl.DelimitedTextFileReaderWriterFactory -Dsecor.compression.codec= -Dsecor.max.file.age.seconds=30 -Dsecor.max.file.size.bytes=50000 -Dostrich.port=19999 -Dsecor.kafka.topic_blacklist=secor.restore -Dsecor.upload.manager.class=com.pinterest.secor.uploader.S3UploadManager -Dlog4j.configuration=file:///opt/secor/log4j.prod.properties -Dconfig=secor.me.prod.backup.properties -cp secor.jar:lib/* com.pinterest.secor.main.ConsumerMain

2020-11-01 21:04:33,522 [Thread-5] (org.apache.kafka.common.config.AbstractConfig:347) INFO ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [kafka.service.consul:9092]
check.crcs = true
client.dns.lookup = default
client.id = ip-10-0-10-97_44_14
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = secor_backup
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer

@HenryCaiHaiying
Copy link
Contributor

HenryCaiHaiying commented Nov 3, 2020 via email

@dovka
Copy link
Contributor Author

dovka commented Nov 4, 2020

Hi Henry - thank you for the answer.

#1443 doesn't look relevant,
but #1449 seems to do exactly what is needed:
"Deleting whole path, instead of files inside"

The problem is, it merged to master on July 12 2020 and released as 0.29 release on July 18.
We use 0.29 and it still doesn't work (e.g. deletes only files but not folders).

Am I'm missing something?
Thank you for your help!
David

@HenryCaiHaiying
Copy link
Contributor

HenryCaiHaiying commented Nov 5, 2020 via email

@pdambrauskas
Copy link
Contributor

I guess it shouldn't be related to shutdown hooks, as they are called only when secor is shutting down.
Deletion after upload:

public void deleteTopicPartition(TopicPartition topicPartition) throws IOException {

deletePath is called for each LogFilePath.

public void deletePath(LogFilePath path) throws IOException {
        TopicPartitionGroup topicPartition = new TopicPartitionGroup(path.getTopic(),
                                                           path.getKafkaPartitions());
        HashSet<LogFilePath> paths = mFiles.get(topicPartition);
        paths.remove(path);
        if (paths.isEmpty()) {
            mFiles.remove(topicPartition);
            StatsUtil.clearLabel("secor.size." + topicPartition.getTopic() + "." +
                                 topicPartition.getPartitions()[0]);
            StatsUtil.clearLabel("secor.modification_age_sec." + topicPartition.getTopic() + "." +
                                 topicPartition.getPartitions()[0]);
        }
        deleteWriter(path);
        FileUtil.delete(path.getLogFilePath());
        FileUtil.delete(path.getLogFileCrcPath());
    }

It looks like it is deleting only files, and keeps directories. I guess it can't delete directories, as they are named by partitions, and multiple consumer threads can write to same partition? Maybe local files shouldn't be partitioned, so after deletion there are no remaining folders?

@dovka
Copy link
Contributor Author

dovka commented Nov 5, 2020

Re: Log prints by Henry:

No "Shut down hook with priority" in any of the secor 0.29 logs files.

Re: pdambrauskas
This makes complete sense - this is what we are observing.
as a workaround we run os command

find /tmp/secor_data/ -type d -empty -delete

to delete only empty dirs.
can this create problems?
If not, can the equivalent be run from deletePath function to cleanup?

Thanks you for your input guys 👍

@pdambrauskas
Copy link
Contributor

I think it is not 100% safe, since we can run into situation where one thread is writing to the same partition, while other thread is deleting the folder at the same time. It is highly unlikely but i guess it is possible, since check if folder is empty and delete action is not atomic operation.

My suggestion would be naming local files in some different way, to avoid creating multiple folders, something like:

/dt=2020-01-01___1_6_00000000000115149171.parquet, and then, when uploading replacing ___ with /, but lets see what @HenryCaiHaiying has to comment on this. Maybe I'm missing something.

It is strange that you are not receiving Shut down hook with priority log message, it should be triggered for each consumer thread. Is your logger configured to log INFO messages? Are you sure you are running 0.29 version?

@dovka
Copy link
Contributor Author

dovka commented Nov 5, 2020

I'm positive we run 0.29
I was running in WARN.
I switched to INFO and here is the hook message at the start of the log:

2020-11-05 08:20:28,895 [main] (com.pinterest.secor.common.ShutdownHookRegistry:50) INFO Shut down hook with priority 10 added to shut down hook registry

@HenryCaiHaiying
Copy link
Contributor

HenryCaiHaiying commented Nov 7, 2020 via email

@dovka
Copy link
Contributor Author

dovka commented Nov 7, 2020

We create about 2 thousand folders per minute on each node right now. This problem usually crashes machine in 2-3 days.
The time between message posted by publisher into kafka and secor consuming it usually doesn't exceed 2-3 minutes,
however might occasionally be a few hours in case of outages. Will the 5-minute delete interval create issue in such a case?
My understanding is that secor take message from kafka, writes it to disk creating the folder, and once the message is uploaded, the message is deleted from disk leaving the folder empty.
Am I wrong here?
how long does the message can sit on the filesystem?
Why would we want to keep the empty folder? Is it ever being reused?
In our naming model each s3 path (and tmp path accordingly) is unique to a message.

Since the flow is very significant, we need not days but minutes to expire the empty folders (we use 5 minutes now).
We use cron right now - but the idea is to hide that complexity, so that we won't need to manage it.
We just set the folder deleting timeout (you can set it in whatever units you like, just need to be able to set smaller time period equal to 5-10 minutes) and forget it.
Of course this is only a wish - you guys decide how you implement it. We just say how it will work for high-volume systems like ours.
Thank you guys for your time -
we appreciate you taking an effort and created great open source system 👍
have a great weekend (the rest of it),
David

@richiesgr
Copy link

Hi
I would like to add some info on this problem.
I've started to get
java.lang.RuntimeException: Failed to write message max bad :11.0 at com.pinterest.secor.consumer.Consumer.handleWriteError(Consumer.java:275) at com.pinterest.secor.consumer.Consumer.consumeNextMessage(Consumer.java:235) at com.pinterest.secor.consumer.Consumer.run(Consumer.java:166) Caused by: java.io.IOException: Mkdirs failed to create file:/mnt/secor_data/message_logs/partition/1_29/prod-og-monitoring_agg_impressions_fr/dt=2021-04-04/hr=04 (exists=false, cwd=file:/opt/secor) at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:455) at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:441) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1067) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1048) at org.apache.parquet.hadoop.util.HadoopOutputFile.create(HadoopOutputFile.java:74) at org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:295) at org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:283) at org.apache.parquet.hadoop.ParquetWriter$Builder.build(ParquetWriter.java:569) at com.pinterest.secor.io.impl.AvroParquetFileReaderWriterFactory$AvroParquetFileWriter.<init>(AvroParquetFileReaderWriterFactory.java:130) at com.pinterest.secor.io.impl.AvroParquetFileReaderWriterFactory.BuildFileWriter(AvroParquetFileReaderWriterFactory.java:79) at com.pinterest.secor.util.ReflectionUtil.createFileWriter(ReflectionUtil.java:156) at com.pinterest.secor.common.FileRegistry.getOrCreateWriter(FileRegistry.java:138) at com.pinterest.secor.writer.MessageWriter.write(MessageWriter.java:105) at com.pinterest.secor.consumer.Consumer.writeMessage(Consumer.java:259) at com.pinterest.secor.consumer.Consumer.consumeNextMessage(Consumer.java:232)

It's not related to permission but to the fact that secor doesn't remove some old file(local file no on S3) I mean it remove some but not all so the disk are full after some time.
For example on the file system I found some file from January

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants